acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  13use itertools::Itertools;
  14use language::language_settings::FormatOnSave;
  15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  16use markdown::Markdown;
  17pub use mention::*;
  18use project::lsp_store::{FormatTrigger, LspFormatTarget};
  19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  20use serde::{Deserialize, Serialize};
  21use serde_json::to_string_pretty;
  22use std::collections::HashMap;
  23use std::error::Error;
  24use std::fmt::{Formatter, Write};
  25use std::ops::Range;
  26use std::process::ExitStatus;
  27use std::rc::Rc;
  28use std::time::{Duration, Instant};
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use task::{Shell, ShellBuilder};
  31pub use terminal::*;
  32use text::Bias;
  33use ui::App;
  34use util::markdown::MarkdownEscaped;
  35use util::path_list::PathList;
  36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  37use uuid::Uuid;
  38
  39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  41pub const TOOL_NAME_META_KEY: &str = "tool_name";
  42
  43/// Helper to extract tool name from ACP meta
  44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  45    meta.as_ref()
  46        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  47        .and_then(|v| v.as_str())
  48        .map(|s| SharedString::from(s.to_owned()))
  49}
  50
  51/// Helper to create meta with tool name
  52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  53    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  54}
  55
  56/// Key used in ACP ToolCall meta to store the session id and message indexes
  57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  58
  59#[derive(Clone, Debug, Deserialize, Serialize)]
  60pub struct SubagentSessionInfo {
  61    /// The session id of the subagent sessiont that was spawned
  62    pub session_id: acp::SessionId,
  63    /// The index of the message of the start of the "turn" run by this tool call
  64    pub message_start_index: usize,
  65    /// The index of the output of the message that the subagent has returned
  66    #[serde(skip_serializing_if = "Option::is_none")]
  67    pub message_end_index: Option<usize>,
  68}
  69
  70/// Helper to extract subagent session id from ACP meta
  71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  72    meta.as_ref()
  73        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  74        .and_then(|v| serde_json::from_value(v.clone()).ok())
  75}
  76
  77#[derive(Debug)]
  78pub struct UserMessage {
  79    pub id: Option<UserMessageId>,
  80    pub content: ContentBlock,
  81    pub chunks: Vec<acp::ContentBlock>,
  82    pub checkpoint: Option<Checkpoint>,
  83    pub indented: bool,
  84}
  85
  86#[derive(Debug)]
  87pub struct Checkpoint {
  88    git_checkpoint: GitStoreCheckpoint,
  89    pub show: bool,
  90}
  91
  92impl UserMessage {
  93    fn to_markdown(&self, cx: &App) -> String {
  94        let mut markdown = String::new();
  95        if self
  96            .checkpoint
  97            .as_ref()
  98            .is_some_and(|checkpoint| checkpoint.show)
  99        {
 100            writeln!(markdown, "## User (checkpoint)").unwrap();
 101        } else {
 102            writeln!(markdown, "## User").unwrap();
 103        }
 104        writeln!(markdown).unwrap();
 105        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 106        writeln!(markdown).unwrap();
 107        markdown
 108    }
 109}
 110
 111#[derive(Debug, PartialEq)]
 112pub struct AssistantMessage {
 113    pub chunks: Vec<AssistantMessageChunk>,
 114    pub indented: bool,
 115    pub is_subagent_output: bool,
 116}
 117
 118impl AssistantMessage {
 119    pub fn to_markdown(&self, cx: &App) -> String {
 120        format!(
 121            "## Assistant\n\n{}\n\n",
 122            self.chunks
 123                .iter()
 124                .map(|chunk| chunk.to_markdown(cx))
 125                .join("\n\n")
 126        )
 127    }
 128}
 129
 130#[derive(Debug, PartialEq)]
 131pub enum AssistantMessageChunk {
 132    Message { block: ContentBlock },
 133    Thought { block: ContentBlock },
 134}
 135
 136impl AssistantMessageChunk {
 137    pub fn from_str(
 138        chunk: &str,
 139        language_registry: &Arc<LanguageRegistry>,
 140        path_style: PathStyle,
 141        cx: &mut App,
 142    ) -> Self {
 143        Self::Message {
 144            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 145        }
 146    }
 147
 148    fn to_markdown(&self, cx: &App) -> String {
 149        match self {
 150            Self::Message { block } => block.to_markdown(cx).to_string(),
 151            Self::Thought { block } => {
 152                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 153            }
 154        }
 155    }
 156}
 157
 158#[derive(Debug)]
 159pub enum AgentThreadEntry {
 160    UserMessage(UserMessage),
 161    AssistantMessage(AssistantMessage),
 162    ToolCall(ToolCall),
 163    CompletedPlan(Vec<PlanEntry>),
 164}
 165
 166impl AgentThreadEntry {
 167    pub fn is_indented(&self) -> bool {
 168        match self {
 169            Self::UserMessage(message) => message.indented,
 170            Self::AssistantMessage(message) => message.indented,
 171            Self::ToolCall(_) => false,
 172            Self::CompletedPlan(_) => false,
 173        }
 174    }
 175
 176    pub fn to_markdown(&self, cx: &App) -> String {
 177        match self {
 178            Self::UserMessage(message) => message.to_markdown(cx),
 179            Self::AssistantMessage(message) => message.to_markdown(cx),
 180            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 181            Self::CompletedPlan(entries) => {
 182                let mut md = String::from("## Plan\n\n");
 183                for entry in entries {
 184                    let source = entry.content.read(cx).source().to_string();
 185                    md.push_str(&format!("- [x] {}\n", source));
 186                }
 187                md
 188            }
 189        }
 190    }
 191
 192    pub fn user_message(&self) -> Option<&UserMessage> {
 193        if let AgentThreadEntry::UserMessage(message) = self {
 194            Some(message)
 195        } else {
 196            None
 197        }
 198    }
 199
 200    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 201        if let AgentThreadEntry::ToolCall(call) = self {
 202            itertools::Either::Left(call.diffs())
 203        } else {
 204            itertools::Either::Right(std::iter::empty())
 205        }
 206    }
 207
 208    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 209        if let AgentThreadEntry::ToolCall(call) = self {
 210            itertools::Either::Left(call.terminals())
 211        } else {
 212            itertools::Either::Right(std::iter::empty())
 213        }
 214    }
 215
 216    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 217        if let AgentThreadEntry::ToolCall(ToolCall {
 218            locations,
 219            resolved_locations,
 220            ..
 221        }) = self
 222        {
 223            Some((
 224                locations.get(ix)?.clone(),
 225                resolved_locations.get(ix)?.clone()?,
 226            ))
 227        } else {
 228            None
 229        }
 230    }
 231}
 232
 233#[derive(Debug)]
 234pub struct ToolCall {
 235    pub id: acp::ToolCallId,
 236    pub label: Entity<Markdown>,
 237    pub kind: acp::ToolKind,
 238    pub content: Vec<ToolCallContent>,
 239    pub status: ToolCallStatus,
 240    pub locations: Vec<acp::ToolCallLocation>,
 241    pub resolved_locations: Vec<Option<AgentLocation>>,
 242    pub raw_input: Option<serde_json::Value>,
 243    pub raw_input_markdown: Option<Entity<Markdown>>,
 244    pub raw_output: Option<serde_json::Value>,
 245    pub tool_name: Option<SharedString>,
 246    pub subagent_session_info: Option<SubagentSessionInfo>,
 247}
 248
 249impl ToolCall {
 250    fn from_acp(
 251        tool_call: acp::ToolCall,
 252        status: ToolCallStatus,
 253        language_registry: Arc<LanguageRegistry>,
 254        path_style: PathStyle,
 255        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 256        cx: &mut App,
 257    ) -> Result<Self> {
 258        let title = if tool_call.kind == acp::ToolKind::Execute {
 259            tool_call.title
 260        } else if tool_call.kind == acp::ToolKind::Edit {
 261            MarkdownEscaped(tool_call.title.as_str()).to_string()
 262        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 263            first_line.to_owned() + ""
 264        } else {
 265            tool_call.title
 266        };
 267        let mut content = Vec::with_capacity(tool_call.content.len());
 268        for item in tool_call.content {
 269            if let Some(item) = ToolCallContent::from_acp(
 270                item,
 271                language_registry.clone(),
 272                path_style,
 273                terminals,
 274                cx,
 275            )? {
 276                content.push(item);
 277            }
 278        }
 279
 280        let raw_input_markdown = tool_call
 281            .raw_input
 282            .as_ref()
 283            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 284
 285        let tool_name = tool_name_from_meta(&tool_call.meta);
 286
 287        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 288
 289        let result = Self {
 290            id: tool_call.tool_call_id,
 291            label: cx
 292                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 293            kind: tool_call.kind,
 294            content,
 295            locations: tool_call.locations,
 296            resolved_locations: Vec::default(),
 297            status,
 298            raw_input: tool_call.raw_input,
 299            raw_input_markdown,
 300            raw_output: tool_call.raw_output,
 301            tool_name,
 302            subagent_session_info,
 303        };
 304        Ok(result)
 305    }
 306
 307    fn update_fields(
 308        &mut self,
 309        fields: acp::ToolCallUpdateFields,
 310        meta: Option<acp::Meta>,
 311        language_registry: Arc<LanguageRegistry>,
 312        path_style: PathStyle,
 313        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 314        cx: &mut App,
 315    ) -> Result<()> {
 316        let acp::ToolCallUpdateFields {
 317            kind,
 318            status,
 319            title,
 320            content,
 321            locations,
 322            raw_input,
 323            raw_output,
 324            ..
 325        } = fields;
 326
 327        if let Some(kind) = kind {
 328            self.kind = kind;
 329        }
 330
 331        if let Some(status) = status {
 332            self.status = status.into();
 333        }
 334
 335        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 336            self.subagent_session_info = Some(subagent_session_info);
 337        }
 338
 339        if let Some(title) = title {
 340            if self.kind == acp::ToolKind::Execute {
 341                for terminal in self.terminals() {
 342                    terminal.update(cx, |terminal, cx| {
 343                        terminal.update_command_label(&title, cx);
 344                    });
 345                }
 346            }
 347            self.label.update(cx, |label, cx| {
 348                if self.kind == acp::ToolKind::Execute {
 349                    label.replace(title, cx);
 350                } else if self.kind == acp::ToolKind::Edit {
 351                    label.replace(MarkdownEscaped(&title).to_string(), cx)
 352                } else if let Some((first_line, _)) = title.split_once("\n") {
 353                    label.replace(first_line.to_owned() + "", cx);
 354                } else {
 355                    label.replace(title, cx);
 356                }
 357            });
 358        }
 359
 360        if let Some(content) = content {
 361            let mut new_content_len = content.len();
 362            let mut content = content.into_iter();
 363
 364            // Reuse existing content if we can
 365            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 366                let valid_content =
 367                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 368                if !valid_content {
 369                    new_content_len -= 1;
 370                }
 371            }
 372            for new in content {
 373                if let Some(new) = ToolCallContent::from_acp(
 374                    new,
 375                    language_registry.clone(),
 376                    path_style,
 377                    terminals,
 378                    cx,
 379                )? {
 380                    self.content.push(new);
 381                } else {
 382                    new_content_len -= 1;
 383                }
 384            }
 385            self.content.truncate(new_content_len);
 386        }
 387
 388        if let Some(locations) = locations {
 389            self.locations = locations;
 390        }
 391
 392        if let Some(raw_input) = raw_input {
 393            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 394            self.raw_input = Some(raw_input);
 395        }
 396
 397        if let Some(raw_output) = raw_output {
 398            if self.content.is_empty()
 399                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 400            {
 401                self.content
 402                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 403                        markdown,
 404                    }));
 405            }
 406            self.raw_output = Some(raw_output);
 407        }
 408        Ok(())
 409    }
 410
 411    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 412        self.content.iter().filter_map(|content| match content {
 413            ToolCallContent::Diff(diff) => Some(diff),
 414            ToolCallContent::ContentBlock(_) => None,
 415            ToolCallContent::Terminal(_) => None,
 416        })
 417    }
 418
 419    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 420        self.content.iter().filter_map(|content| match content {
 421            ToolCallContent::Terminal(terminal) => Some(terminal),
 422            ToolCallContent::ContentBlock(_) => None,
 423            ToolCallContent::Diff(_) => None,
 424        })
 425    }
 426
 427    pub fn is_subagent(&self) -> bool {
 428        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 429            || self.subagent_session_info.is_some()
 430    }
 431
 432    pub fn to_markdown(&self, cx: &App) -> String {
 433        let mut markdown = format!(
 434            "**Tool Call: {}**\nStatus: {}\n\n",
 435            self.label.read(cx).source(),
 436            self.status
 437        );
 438        for content in &self.content {
 439            markdown.push_str(content.to_markdown(cx).as_str());
 440            markdown.push_str("\n\n");
 441        }
 442        markdown
 443    }
 444
 445    async fn resolve_location(
 446        location: acp::ToolCallLocation,
 447        project: WeakEntity<Project>,
 448        cx: &mut AsyncApp,
 449    ) -> Option<ResolvedLocation> {
 450        let buffer = project
 451            .update(cx, |project, cx| {
 452                project
 453                    .project_path_for_absolute_path(&location.path, cx)
 454                    .map(|path| project.open_buffer(path, cx))
 455            })
 456            .ok()??;
 457        let buffer = buffer.await.log_err()?;
 458        let position = buffer.update(cx, |buffer, _| {
 459            let snapshot = buffer.snapshot();
 460            if let Some(row) = location.line {
 461                let column = snapshot.indent_size_for_line(row).len;
 462                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 463                snapshot.anchor_before(point)
 464            } else {
 465                Anchor::min_for_buffer(snapshot.remote_id())
 466            }
 467        });
 468
 469        Some(ResolvedLocation { buffer, position })
 470    }
 471
 472    fn resolve_locations(
 473        &self,
 474        project: Entity<Project>,
 475        cx: &mut App,
 476    ) -> Task<Vec<Option<ResolvedLocation>>> {
 477        let locations = self.locations.clone();
 478        project.update(cx, |_, cx| {
 479            cx.spawn(async move |project, cx| {
 480                let mut new_locations = Vec::new();
 481                for location in locations {
 482                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 483                }
 484                new_locations
 485            })
 486        })
 487    }
 488}
 489
 490// Separate so we can hold a strong reference to the buffer
 491// for saving on the thread
 492#[derive(Clone, Debug, PartialEq, Eq)]
 493struct ResolvedLocation {
 494    buffer: Entity<Buffer>,
 495    position: Anchor,
 496}
 497
 498impl From<&ResolvedLocation> for AgentLocation {
 499    fn from(value: &ResolvedLocation) -> Self {
 500        Self {
 501            buffer: value.buffer.downgrade(),
 502            position: value.position,
 503        }
 504    }
 505}
 506
 507#[derive(Debug, Clone)]
 508pub enum SelectedPermissionParams {
 509    Terminal { patterns: Vec<String> },
 510}
 511
 512#[derive(Debug)]
 513pub struct SelectedPermissionOutcome {
 514    pub option_id: acp::PermissionOptionId,
 515    pub option_kind: acp::PermissionOptionKind,
 516    pub params: Option<SelectedPermissionParams>,
 517}
 518
 519impl SelectedPermissionOutcome {
 520    pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
 521        Self {
 522            option_id,
 523            option_kind,
 524            params: None,
 525        }
 526    }
 527
 528    pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
 529        self.params = params;
 530        self
 531    }
 532}
 533
 534impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
 535    fn from(value: SelectedPermissionOutcome) -> Self {
 536        Self::new(value.option_id)
 537    }
 538}
 539
 540#[derive(Debug)]
 541pub enum RequestPermissionOutcome {
 542    Cancelled,
 543    Selected(SelectedPermissionOutcome),
 544}
 545
 546impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
 547    fn from(value: RequestPermissionOutcome) -> Self {
 548        match value {
 549            RequestPermissionOutcome::Cancelled => Self::Cancelled,
 550            RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
 551        }
 552    }
 553}
 554
 555#[derive(Debug)]
 556pub enum ToolCallStatus {
 557    /// The tool call hasn't started running yet, but we start showing it to
 558    /// the user.
 559    Pending,
 560    /// The tool call is waiting for confirmation from the user.
 561    WaitingForConfirmation {
 562        options: PermissionOptions,
 563        respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
 564    },
 565    /// The tool call is currently running.
 566    InProgress,
 567    /// The tool call completed successfully.
 568    Completed,
 569    /// The tool call failed.
 570    Failed,
 571    /// The user rejected the tool call.
 572    Rejected,
 573    /// The user canceled generation so the tool call was canceled.
 574    Canceled,
 575}
 576
 577impl From<acp::ToolCallStatus> for ToolCallStatus {
 578    fn from(status: acp::ToolCallStatus) -> Self {
 579        match status {
 580            acp::ToolCallStatus::Pending => Self::Pending,
 581            acp::ToolCallStatus::InProgress => Self::InProgress,
 582            acp::ToolCallStatus::Completed => Self::Completed,
 583            acp::ToolCallStatus::Failed => Self::Failed,
 584            _ => Self::Pending,
 585        }
 586    }
 587}
 588
 589impl Display for ToolCallStatus {
 590    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 591        write!(
 592            f,
 593            "{}",
 594            match self {
 595                ToolCallStatus::Pending => "Pending",
 596                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 597                ToolCallStatus::InProgress => "In Progress",
 598                ToolCallStatus::Completed => "Completed",
 599                ToolCallStatus::Failed => "Failed",
 600                ToolCallStatus::Rejected => "Rejected",
 601                ToolCallStatus::Canceled => "Canceled",
 602            }
 603        )
 604    }
 605}
 606
 607#[derive(Debug, PartialEq, Clone)]
 608pub enum ContentBlock {
 609    Empty,
 610    Markdown { markdown: Entity<Markdown> },
 611    ResourceLink { resource_link: acp::ResourceLink },
 612    Image { image: Arc<gpui::Image> },
 613}
 614
 615impl ContentBlock {
 616    pub fn new(
 617        block: acp::ContentBlock,
 618        language_registry: &Arc<LanguageRegistry>,
 619        path_style: PathStyle,
 620        cx: &mut App,
 621    ) -> Self {
 622        let mut this = Self::Empty;
 623        this.append(block, language_registry, path_style, cx);
 624        this
 625    }
 626
 627    pub fn new_combined(
 628        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 629        language_registry: Arc<LanguageRegistry>,
 630        path_style: PathStyle,
 631        cx: &mut App,
 632    ) -> Self {
 633        let mut this = Self::Empty;
 634        for block in blocks {
 635            this.append(block, &language_registry, path_style, cx);
 636        }
 637        this
 638    }
 639
 640    pub fn append(
 641        &mut self,
 642        block: acp::ContentBlock,
 643        language_registry: &Arc<LanguageRegistry>,
 644        path_style: PathStyle,
 645        cx: &mut App,
 646    ) {
 647        match (&mut *self, &block) {
 648            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 649                *self = ContentBlock::ResourceLink {
 650                    resource_link: resource_link.clone(),
 651                };
 652            }
 653            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 654                if let Some(image) = Self::decode_image(image_content) {
 655                    *self = ContentBlock::Image { image };
 656                } else {
 657                    let new_content = Self::image_md(image_content);
 658                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 659                }
 660            }
 661            (ContentBlock::Empty, _) => {
 662                let new_content = Self::block_string_contents(&block, path_style);
 663                *self = Self::create_markdown_block(new_content, language_registry, cx);
 664            }
 665            (ContentBlock::Markdown { markdown }, _) => {
 666                let new_content = Self::block_string_contents(&block, path_style);
 667                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 668            }
 669            (ContentBlock::ResourceLink { resource_link }, _) => {
 670                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 671                let new_content = Self::block_string_contents(&block, path_style);
 672                let combined = format!("{}\n{}", existing_content, new_content);
 673                *self = Self::create_markdown_block(combined, language_registry, cx);
 674            }
 675            (ContentBlock::Image { .. }, _) => {
 676                let new_content = Self::block_string_contents(&block, path_style);
 677                let combined = format!("`Image`\n{}", new_content);
 678                *self = Self::create_markdown_block(combined, language_registry, cx);
 679            }
 680        }
 681    }
 682
 683    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 684        use base64::Engine as _;
 685
 686        let bytes = base64::engine::general_purpose::STANDARD
 687            .decode(image_content.data.as_bytes())
 688            .ok()?;
 689        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 690        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 691    }
 692
 693    fn create_markdown_block(
 694        content: String,
 695        language_registry: &Arc<LanguageRegistry>,
 696        cx: &mut App,
 697    ) -> ContentBlock {
 698        ContentBlock::Markdown {
 699            markdown: cx
 700                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 701        }
 702    }
 703
 704    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 705        match block {
 706            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 707            acp::ContentBlock::ResourceLink(resource_link) => {
 708                Self::resource_link_md(&resource_link.uri, path_style)
 709            }
 710            acp::ContentBlock::Resource(acp::EmbeddedResource {
 711                resource:
 712                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 713                        uri,
 714                        ..
 715                    }),
 716                ..
 717            }) => Self::resource_link_md(uri, path_style),
 718            acp::ContentBlock::Image(image) => Self::image_md(image),
 719            _ => String::new(),
 720        }
 721    }
 722
 723    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 724        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 725            uri.as_link().to_string()
 726        } else {
 727            uri.to_string()
 728        }
 729    }
 730
 731    fn image_md(_image: &acp::ImageContent) -> String {
 732        "`Image`".into()
 733    }
 734
 735    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 736        match self {
 737            ContentBlock::Empty => "",
 738            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 739            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 740            ContentBlock::Image { .. } => "`Image`",
 741        }
 742    }
 743
 744    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 745        match self {
 746            ContentBlock::Empty => None,
 747            ContentBlock::Markdown { markdown } => Some(markdown),
 748            ContentBlock::ResourceLink { .. } => None,
 749            ContentBlock::Image { .. } => None,
 750        }
 751    }
 752
 753    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 754        match self {
 755            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 756            _ => None,
 757        }
 758    }
 759
 760    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 761        match self {
 762            ContentBlock::Image { image } => Some(image),
 763            _ => None,
 764        }
 765    }
 766}
 767
 768#[derive(Debug)]
 769pub enum ToolCallContent {
 770    ContentBlock(ContentBlock),
 771    Diff(Entity<Diff>),
 772    Terminal(Entity<Terminal>),
 773}
 774
 775impl ToolCallContent {
 776    pub fn from_acp(
 777        content: acp::ToolCallContent,
 778        language_registry: Arc<LanguageRegistry>,
 779        path_style: PathStyle,
 780        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 781        cx: &mut App,
 782    ) -> Result<Option<Self>> {
 783        match content {
 784            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 785                Ok(Some(Self::ContentBlock(ContentBlock::new(
 786                    content,
 787                    &language_registry,
 788                    path_style,
 789                    cx,
 790                ))))
 791            }
 792            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 793                Diff::finalized(
 794                    diff.path.to_string_lossy().into_owned(),
 795                    diff.old_text,
 796                    diff.new_text,
 797                    language_registry,
 798                    cx,
 799                )
 800            })))),
 801            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 802                .get(&terminal_id)
 803                .cloned()
 804                .map(|terminal| Some(Self::Terminal(terminal)))
 805                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 806            _ => Ok(None),
 807        }
 808    }
 809
 810    pub fn update_from_acp(
 811        &mut self,
 812        new: acp::ToolCallContent,
 813        language_registry: Arc<LanguageRegistry>,
 814        path_style: PathStyle,
 815        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 816        cx: &mut App,
 817    ) -> Result<bool> {
 818        let needs_update = match (&self, &new) {
 819            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 820                old_diff.read(cx).needs_update(
 821                    new_diff.old_text.as_deref().unwrap_or(""),
 822                    &new_diff.new_text,
 823                    cx,
 824                )
 825            }
 826            _ => true,
 827        };
 828
 829        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 830            if needs_update {
 831                *self = update;
 832            }
 833            Ok(true)
 834        } else {
 835            Ok(false)
 836        }
 837    }
 838
 839    pub fn to_markdown(&self, cx: &App) -> String {
 840        match self {
 841            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 842            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 843            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 844        }
 845    }
 846
 847    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 848        match self {
 849            Self::ContentBlock(content) => content.image(),
 850            _ => None,
 851        }
 852    }
 853}
 854
 855#[derive(Debug, PartialEq)]
 856pub enum ToolCallUpdate {
 857    UpdateFields(acp::ToolCallUpdate),
 858    UpdateDiff(ToolCallUpdateDiff),
 859    UpdateTerminal(ToolCallUpdateTerminal),
 860}
 861
 862impl ToolCallUpdate {
 863    fn id(&self) -> &acp::ToolCallId {
 864        match self {
 865            Self::UpdateFields(update) => &update.tool_call_id,
 866            Self::UpdateDiff(diff) => &diff.id,
 867            Self::UpdateTerminal(terminal) => &terminal.id,
 868        }
 869    }
 870}
 871
 872impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 873    fn from(update: acp::ToolCallUpdate) -> Self {
 874        Self::UpdateFields(update)
 875    }
 876}
 877
 878impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 879    fn from(diff: ToolCallUpdateDiff) -> Self {
 880        Self::UpdateDiff(diff)
 881    }
 882}
 883
 884#[derive(Debug, PartialEq)]
 885pub struct ToolCallUpdateDiff {
 886    pub id: acp::ToolCallId,
 887    pub diff: Entity<Diff>,
 888}
 889
 890impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 891    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 892        Self::UpdateTerminal(terminal)
 893    }
 894}
 895
 896#[derive(Debug, PartialEq)]
 897pub struct ToolCallUpdateTerminal {
 898    pub id: acp::ToolCallId,
 899    pub terminal: Entity<Terminal>,
 900}
 901
 902#[derive(Debug, Default)]
 903pub struct Plan {
 904    pub entries: Vec<PlanEntry>,
 905}
 906
 907#[derive(Debug)]
 908pub struct PlanStats<'a> {
 909    pub in_progress_entry: Option<&'a PlanEntry>,
 910    pub pending: u32,
 911    pub completed: u32,
 912}
 913
 914impl Plan {
 915    pub fn is_empty(&self) -> bool {
 916        self.entries.is_empty()
 917    }
 918
 919    pub fn stats(&self) -> PlanStats<'_> {
 920        let mut stats = PlanStats {
 921            in_progress_entry: None,
 922            pending: 0,
 923            completed: 0,
 924        };
 925
 926        for entry in &self.entries {
 927            match &entry.status {
 928                acp::PlanEntryStatus::Pending => {
 929                    stats.pending += 1;
 930                }
 931                acp::PlanEntryStatus::InProgress => {
 932                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 933                    stats.pending += 1;
 934                }
 935                acp::PlanEntryStatus::Completed => {
 936                    stats.completed += 1;
 937                }
 938                _ => {}
 939            }
 940        }
 941
 942        stats
 943    }
 944}
 945
 946#[derive(Debug)]
 947pub struct PlanEntry {
 948    pub content: Entity<Markdown>,
 949    pub priority: acp::PlanEntryPriority,
 950    pub status: acp::PlanEntryStatus,
 951}
 952
 953impl PlanEntry {
 954    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 955        Self {
 956            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 957            priority: entry.priority,
 958            status: entry.status,
 959        }
 960    }
 961}
 962
 963#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 964pub struct TokenUsage {
 965    pub max_tokens: u64,
 966    pub used_tokens: u64,
 967    pub input_tokens: u64,
 968    pub output_tokens: u64,
 969    pub max_output_tokens: Option<u64>,
 970}
 971
 972pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 973
 974impl TokenUsage {
 975    pub fn ratio(&self) -> TokenUsageRatio {
 976        #[cfg(debug_assertions)]
 977        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 978            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 979            .parse()
 980            .unwrap();
 981        #[cfg(not(debug_assertions))]
 982        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
 983
 984        // When the maximum is unknown because there is no selected model,
 985        // avoid showing the token limit warning.
 986        if self.max_tokens == 0 {
 987            TokenUsageRatio::Normal
 988        } else if self.used_tokens >= self.max_tokens {
 989            TokenUsageRatio::Exceeded
 990        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 991            TokenUsageRatio::Warning
 992        } else {
 993            TokenUsageRatio::Normal
 994        }
 995    }
 996}
 997
 998#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
 999pub enum TokenUsageRatio {
1000    Normal,
1001    Warning,
1002    Exceeded,
1003}
1004
1005#[derive(Debug, Clone)]
1006pub struct RetryStatus {
1007    pub last_error: SharedString,
1008    pub attempt: usize,
1009    pub max_attempts: usize,
1010    pub started_at: Instant,
1011    pub duration: Duration,
1012}
1013
1014struct RunningTurn {
1015    id: u32,
1016    send_task: Task<()>,
1017}
1018
1019pub struct AcpThread {
1020    session_id: acp::SessionId,
1021    work_dirs: Option<PathList>,
1022    parent_session_id: Option<acp::SessionId>,
1023    title: Option<SharedString>,
1024    provisional_title: Option<SharedString>,
1025    entries: Vec<AgentThreadEntry>,
1026    plan: Plan,
1027    project: Entity<Project>,
1028    action_log: Entity<ActionLog>,
1029    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1030    turn_id: u32,
1031    running_turn: Option<RunningTurn>,
1032    connection: Rc<dyn AgentConnection>,
1033    token_usage: Option<TokenUsage>,
1034    prompt_capabilities: acp::PromptCapabilities,
1035    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1036    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1037    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1038    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1039    had_error: bool,
1040    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1041    draft_prompt: Option<Vec<acp::ContentBlock>>,
1042    /// The initial scroll position for the thread view, set during session registration.
1043    ui_scroll_position: Option<gpui::ListOffset>,
1044    /// Buffer for smooth text streaming. Holds text that has been received from
1045    /// the model but not yet revealed in the UI. A timer task drains this buffer
1046    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1047    /// updates.
1048    streaming_text_buffer: Option<StreamingTextBuffer>,
1049}
1050
1051struct StreamingTextBuffer {
1052    /// Text received from the model but not yet appended to the Markdown source.
1053    pending: String,
1054    /// The number of bytes to reveal per timer turn.
1055    bytes_to_reveal_per_tick: usize,
1056    /// The Markdown entity being streamed into.
1057    target: Entity<Markdown>,
1058    /// Timer task that periodically moves text from `pending` into `source`.
1059    _reveal_task: Task<()>,
1060}
1061
1062impl StreamingTextBuffer {
1063    /// The number of milliseconds between each timer tick, controlling how quickly
1064    /// text is revealed.
1065    const TASK_UPDATE_MS: u64 = 16;
1066    /// The time in milliseconds to reveal the entire pending text.
1067    const REVEAL_TARGET: f32 = 200.0;
1068}
1069
1070impl From<&AcpThread> for ActionLogTelemetry {
1071    fn from(value: &AcpThread) -> Self {
1072        Self {
1073            agent_telemetry_id: value.connection().telemetry_id(),
1074            session_id: value.session_id.0.clone(),
1075        }
1076    }
1077}
1078
1079#[derive(Debug)]
1080pub enum AcpThreadEvent {
1081    NewEntry,
1082    TitleUpdated,
1083    TokenUsageUpdated,
1084    EntryUpdated(usize),
1085    EntriesRemoved(Range<usize>),
1086    ToolAuthorizationRequested(acp::ToolCallId),
1087    ToolAuthorizationReceived(acp::ToolCallId),
1088    Retry(RetryStatus),
1089    SubagentSpawned(acp::SessionId),
1090    Stopped(acp::StopReason),
1091    Error,
1092    LoadError(LoadError),
1093    PromptCapabilitiesUpdated,
1094    Refusal,
1095    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1096    ModeUpdated(acp::SessionModeId),
1097    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1098    WorkingDirectoriesUpdated,
1099}
1100
1101impl EventEmitter<AcpThreadEvent> for AcpThread {}
1102
1103#[derive(Debug, Clone)]
1104pub enum TerminalProviderEvent {
1105    Created {
1106        terminal_id: acp::TerminalId,
1107        label: String,
1108        cwd: Option<PathBuf>,
1109        output_byte_limit: Option<u64>,
1110        terminal: Entity<::terminal::Terminal>,
1111    },
1112    Output {
1113        terminal_id: acp::TerminalId,
1114        data: Vec<u8>,
1115    },
1116    TitleChanged {
1117        terminal_id: acp::TerminalId,
1118        title: String,
1119    },
1120    Exit {
1121        terminal_id: acp::TerminalId,
1122        status: acp::TerminalExitStatus,
1123    },
1124}
1125
1126#[derive(Debug, Clone)]
1127pub enum TerminalProviderCommand {
1128    WriteInput {
1129        terminal_id: acp::TerminalId,
1130        bytes: Vec<u8>,
1131    },
1132    Resize {
1133        terminal_id: acp::TerminalId,
1134        cols: u16,
1135        rows: u16,
1136    },
1137    Close {
1138        terminal_id: acp::TerminalId,
1139    },
1140}
1141
1142#[derive(PartialEq, Eq, Debug)]
1143pub enum ThreadStatus {
1144    Idle,
1145    Generating,
1146}
1147
1148#[derive(Debug, Clone)]
1149pub enum LoadError {
1150    Unsupported {
1151        command: SharedString,
1152        current_version: SharedString,
1153        minimum_version: SharedString,
1154    },
1155    FailedToInstall(SharedString),
1156    Exited {
1157        status: ExitStatus,
1158    },
1159    Other(SharedString),
1160}
1161
1162impl Display for LoadError {
1163    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1164        match self {
1165            LoadError::Unsupported {
1166                command: path,
1167                current_version,
1168                minimum_version,
1169            } => {
1170                write!(
1171                    f,
1172                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1173                )
1174            }
1175            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1176            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1177            LoadError::Other(msg) => write!(f, "{msg}"),
1178        }
1179    }
1180}
1181
1182impl Error for LoadError {}
1183
1184impl AcpThread {
1185    pub fn new(
1186        parent_session_id: Option<acp::SessionId>,
1187        title: Option<SharedString>,
1188        work_dirs: Option<PathList>,
1189        connection: Rc<dyn AgentConnection>,
1190        project: Entity<Project>,
1191        action_log: Entity<ActionLog>,
1192        session_id: acp::SessionId,
1193        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1194        cx: &mut Context<Self>,
1195    ) -> Self {
1196        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1197        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1198            loop {
1199                let caps = prompt_capabilities_rx.recv().await?;
1200                this.update(cx, |this, cx| {
1201                    this.prompt_capabilities = caps;
1202                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1203                })?;
1204            }
1205        });
1206
1207        Self {
1208            parent_session_id,
1209            work_dirs,
1210            action_log,
1211            shared_buffers: Default::default(),
1212            entries: Default::default(),
1213            plan: Default::default(),
1214            title,
1215            provisional_title: None,
1216            project,
1217            running_turn: None,
1218            turn_id: 0,
1219            connection,
1220            session_id,
1221            token_usage: None,
1222            prompt_capabilities,
1223            _observe_prompt_capabilities: task,
1224            terminals: HashMap::default(),
1225            pending_terminal_output: HashMap::default(),
1226            pending_terminal_exit: HashMap::default(),
1227            had_error: false,
1228            draft_prompt: None,
1229            ui_scroll_position: None,
1230            streaming_text_buffer: None,
1231        }
1232    }
1233
1234    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1235        self.parent_session_id.as_ref()
1236    }
1237
1238    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1239        self.prompt_capabilities.clone()
1240    }
1241
1242    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1243        self.draft_prompt.as_deref()
1244    }
1245
1246    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1247        self.draft_prompt = prompt;
1248    }
1249
1250    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1251        self.ui_scroll_position
1252    }
1253
1254    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1255        self.ui_scroll_position = position;
1256    }
1257
1258    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1259        &self.connection
1260    }
1261
1262    pub fn action_log(&self) -> &Entity<ActionLog> {
1263        &self.action_log
1264    }
1265
1266    pub fn project(&self) -> &Entity<Project> {
1267        &self.project
1268    }
1269
1270    pub fn title(&self) -> Option<SharedString> {
1271        self.title
1272            .clone()
1273            .or_else(|| self.provisional_title.clone())
1274    }
1275
1276    pub fn has_provisional_title(&self) -> bool {
1277        self.provisional_title.is_some()
1278    }
1279
1280    pub fn entries(&self) -> &[AgentThreadEntry] {
1281        &self.entries
1282    }
1283
1284    pub fn session_id(&self) -> &acp::SessionId {
1285        &self.session_id
1286    }
1287
1288    pub fn work_dirs(&self) -> Option<&PathList> {
1289        self.work_dirs.as_ref()
1290    }
1291
1292    pub fn set_work_dirs(&mut self, work_dirs: PathList, cx: &mut Context<Self>) {
1293        self.work_dirs = Some(work_dirs);
1294        cx.emit(AcpThreadEvent::WorkingDirectoriesUpdated)
1295    }
1296
1297    pub fn status(&self) -> ThreadStatus {
1298        if self.running_turn.is_some() {
1299            ThreadStatus::Generating
1300        } else {
1301            ThreadStatus::Idle
1302        }
1303    }
1304
1305    pub fn had_error(&self) -> bool {
1306        self.had_error
1307    }
1308
1309    pub fn is_waiting_for_confirmation(&self) -> bool {
1310        for entry in self.entries.iter().rev() {
1311            match entry {
1312                AgentThreadEntry::UserMessage(_) => return false,
1313                AgentThreadEntry::ToolCall(ToolCall {
1314                    status: ToolCallStatus::WaitingForConfirmation { .. },
1315                    ..
1316                }) => return true,
1317                AgentThreadEntry::ToolCall(_)
1318                | AgentThreadEntry::AssistantMessage(_)
1319                | AgentThreadEntry::CompletedPlan(_) => {}
1320            }
1321        }
1322        false
1323    }
1324
1325    pub fn token_usage(&self) -> Option<&TokenUsage> {
1326        self.token_usage.as_ref()
1327    }
1328
1329    pub fn has_pending_edit_tool_calls(&self) -> bool {
1330        for entry in self.entries.iter().rev() {
1331            match entry {
1332                AgentThreadEntry::UserMessage(_) => return false,
1333                AgentThreadEntry::ToolCall(
1334                    call @ ToolCall {
1335                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1336                        ..
1337                    },
1338                ) if call.diffs().next().is_some() => {
1339                    return true;
1340                }
1341                AgentThreadEntry::ToolCall(_)
1342                | AgentThreadEntry::AssistantMessage(_)
1343                | AgentThreadEntry::CompletedPlan(_) => {}
1344            }
1345        }
1346
1347        false
1348    }
1349
1350    pub fn has_in_progress_tool_calls(&self) -> bool {
1351        for entry in self.entries.iter().rev() {
1352            match entry {
1353                AgentThreadEntry::UserMessage(_) => return false,
1354                AgentThreadEntry::ToolCall(ToolCall {
1355                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1356                    ..
1357                }) => {
1358                    return true;
1359                }
1360                AgentThreadEntry::ToolCall(_)
1361                | AgentThreadEntry::AssistantMessage(_)
1362                | AgentThreadEntry::CompletedPlan(_) => {}
1363            }
1364        }
1365
1366        false
1367    }
1368
1369    pub fn used_tools_since_last_user_message(&self) -> bool {
1370        for entry in self.entries.iter().rev() {
1371            match entry {
1372                AgentThreadEntry::UserMessage(..) => return false,
1373                AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1374                    continue;
1375                }
1376                AgentThreadEntry::ToolCall(..) => return true,
1377            }
1378        }
1379
1380        false
1381    }
1382
1383    pub fn handle_session_update(
1384        &mut self,
1385        update: acp::SessionUpdate,
1386        cx: &mut Context<Self>,
1387    ) -> Result<(), acp::Error> {
1388        match update {
1389            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1390                self.push_user_content_block(None, content, cx);
1391            }
1392            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1393                self.push_assistant_content_block(content, false, cx);
1394            }
1395            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1396                self.push_assistant_content_block(content, true, cx);
1397            }
1398            acp::SessionUpdate::ToolCall(tool_call) => {
1399                self.upsert_tool_call(tool_call, cx)?;
1400            }
1401            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1402                self.update_tool_call(tool_call_update, cx)?;
1403            }
1404            acp::SessionUpdate::Plan(plan) => {
1405                self.update_plan(plan, cx);
1406            }
1407            acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1408                if let acp::MaybeUndefined::Value(title) = info_update.title {
1409                    let had_provisional = self.provisional_title.take().is_some();
1410                    let title: SharedString = title.into();
1411                    if self.title.as_ref() != Some(&title) {
1412                        self.title = Some(title);
1413                        cx.emit(AcpThreadEvent::TitleUpdated);
1414                    } else if had_provisional {
1415                        cx.emit(AcpThreadEvent::TitleUpdated);
1416                    }
1417                }
1418            }
1419            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1420                available_commands,
1421                ..
1422            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1423            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1424                current_mode_id,
1425                ..
1426            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1427            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1428                config_options,
1429                ..
1430            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1431            _ => {}
1432        }
1433        Ok(())
1434    }
1435
1436    pub fn push_user_content_block(
1437        &mut self,
1438        message_id: Option<UserMessageId>,
1439        chunk: acp::ContentBlock,
1440        cx: &mut Context<Self>,
1441    ) {
1442        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1443    }
1444
1445    pub fn push_user_content_block_with_indent(
1446        &mut self,
1447        message_id: Option<UserMessageId>,
1448        chunk: acp::ContentBlock,
1449        indented: bool,
1450        cx: &mut Context<Self>,
1451    ) {
1452        let language_registry = self.project.read(cx).languages().clone();
1453        let path_style = self.project.read(cx).path_style(cx);
1454        let entries_len = self.entries.len();
1455
1456        if let Some(last_entry) = self.entries.last_mut()
1457            && let AgentThreadEntry::UserMessage(UserMessage {
1458                id,
1459                content,
1460                chunks,
1461                indented: existing_indented,
1462                ..
1463            }) = last_entry
1464            && *existing_indented == indented
1465        {
1466            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1467            *id = message_id.or(id.take());
1468            content.append(chunk.clone(), &language_registry, path_style, cx);
1469            chunks.push(chunk);
1470            let idx = entries_len - 1;
1471            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1472        } else {
1473            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1474            self.push_entry(
1475                AgentThreadEntry::UserMessage(UserMessage {
1476                    id: message_id,
1477                    content,
1478                    chunks: vec![chunk],
1479                    checkpoint: None,
1480                    indented,
1481                }),
1482                cx,
1483            );
1484        }
1485    }
1486
1487    pub fn push_assistant_content_block(
1488        &mut self,
1489        chunk: acp::ContentBlock,
1490        is_thought: bool,
1491        cx: &mut Context<Self>,
1492    ) {
1493        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1494    }
1495
1496    pub fn push_assistant_content_block_with_indent(
1497        &mut self,
1498        chunk: acp::ContentBlock,
1499        is_thought: bool,
1500        indented: bool,
1501        cx: &mut Context<Self>,
1502    ) {
1503        let path_style = self.project.read(cx).path_style(cx);
1504
1505        // For text chunks going to an existing Markdown block, buffer for smooth
1506        // streaming instead of appending all at once which may feel more choppy.
1507        if let acp::ContentBlock::Text(text_content) = &chunk {
1508            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1509                let entries_len = self.entries.len();
1510                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1511                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1512                return;
1513            }
1514        }
1515
1516        let language_registry = self.project.read(cx).languages().clone();
1517        let entries_len = self.entries.len();
1518        if let Some(last_entry) = self.entries.last_mut()
1519            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1520                chunks,
1521                indented: existing_indented,
1522                is_subagent_output: _,
1523            }) = last_entry
1524            && *existing_indented == indented
1525        {
1526            let idx = entries_len - 1;
1527            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1528            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1529            match (chunks.last_mut(), is_thought) {
1530                (Some(AssistantMessageChunk::Message { block }), false)
1531                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1532                    block.append(chunk, &language_registry, path_style, cx)
1533                }
1534                _ => {
1535                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1536                    if is_thought {
1537                        chunks.push(AssistantMessageChunk::Thought { block })
1538                    } else {
1539                        chunks.push(AssistantMessageChunk::Message { block })
1540                    }
1541                }
1542            }
1543        } else {
1544            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1545            let chunk = if is_thought {
1546                AssistantMessageChunk::Thought { block }
1547            } else {
1548                AssistantMessageChunk::Message { block }
1549            };
1550
1551            self.push_entry(
1552                AgentThreadEntry::AssistantMessage(AssistantMessage {
1553                    chunks: vec![chunk],
1554                    indented,
1555                    is_subagent_output: false,
1556                }),
1557                cx,
1558            );
1559        }
1560    }
1561
1562    fn streaming_markdown_target(
1563        &self,
1564        is_thought: bool,
1565        indented: bool,
1566    ) -> Option<Entity<Markdown>> {
1567        let last_entry = self.entries.last()?;
1568        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1569            chunks,
1570            indented: existing_indented,
1571            ..
1572        }) = last_entry
1573            && *existing_indented == indented
1574            && let [.., chunk] = chunks.as_slice()
1575        {
1576            match (chunk, is_thought) {
1577                (
1578                    AssistantMessageChunk::Message {
1579                        block: ContentBlock::Markdown { markdown },
1580                    },
1581                    false,
1582                )
1583                | (
1584                    AssistantMessageChunk::Thought {
1585                        block: ContentBlock::Markdown { markdown },
1586                    },
1587                    true,
1588                ) => Some(markdown.clone()),
1589                _ => None,
1590            }
1591        } else {
1592            None
1593        }
1594    }
1595
1596    /// Add text to the streaming buffer. If the target changed (e.g. switching
1597    /// from thoughts to message text), flush the old buffer first.
1598    fn buffer_streaming_text(
1599        &mut self,
1600        markdown: &Entity<Markdown>,
1601        text: String,
1602        cx: &mut Context<Self>,
1603    ) {
1604        if let Some(buffer) = &mut self.streaming_text_buffer {
1605            if buffer.target.entity_id() == markdown.entity_id() {
1606                buffer.pending.push_str(&text);
1607
1608                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1609                    / StreamingTextBuffer::REVEAL_TARGET
1610                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1611                    .ceil() as usize;
1612                return;
1613            }
1614            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1615        }
1616
1617        let target = markdown.clone();
1618        let _reveal_task = self.start_streaming_reveal(cx);
1619        let pending_len = text.len();
1620        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1621            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1622            .ceil() as usize;
1623        self.streaming_text_buffer = Some(StreamingTextBuffer {
1624            pending: text,
1625            bytes_to_reveal_per_tick: bytes_to_reveal,
1626            target,
1627            _reveal_task,
1628        });
1629    }
1630
1631    /// Flush all buffered streaming text into the Markdown entity immediately.
1632    fn flush_streaming_text(
1633        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1634        cx: &mut Context<Self>,
1635    ) {
1636        if let Some(buffer) = streaming_text_buffer.take() {
1637            if !buffer.pending.is_empty() {
1638                buffer
1639                    .target
1640                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1641            }
1642        }
1643    }
1644
1645    /// Spawns a foreground task that periodically drains
1646    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1647    /// producing smooth, continuous text output.
1648    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1649        cx.spawn(async move |this, cx| {
1650            loop {
1651                cx.background_executor()
1652                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1653                    .await;
1654
1655                let should_continue = this
1656                    .update(cx, |this, cx| {
1657                        let Some(buffer) = &mut this.streaming_text_buffer else {
1658                            return false;
1659                        };
1660
1661                        if buffer.pending.is_empty() {
1662                            return true;
1663                        }
1664
1665                        let pending_len = buffer.pending.len();
1666
1667                        let byte_boundary = buffer
1668                            .pending
1669                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1670                            .min(pending_len);
1671
1672                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1673                            markdown.append(&buffer.pending[..byte_boundary], cx);
1674                            buffer.pending.drain(..byte_boundary);
1675                        });
1676
1677                        true
1678                    })
1679                    .unwrap_or(false);
1680
1681                if !should_continue {
1682                    break;
1683                }
1684            }
1685        })
1686    }
1687
1688    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1689        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1690        self.entries.push(entry);
1691        cx.emit(AcpThreadEvent::NewEntry);
1692    }
1693
1694    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1695        self.connection.set_title(&self.session_id, cx).is_some()
1696    }
1697
1698    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1699        let had_provisional = self.provisional_title.take().is_some();
1700        if self.title.as_ref() != Some(&title) {
1701            self.title = Some(title.clone());
1702            cx.emit(AcpThreadEvent::TitleUpdated);
1703            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1704                return set_title.run(title, cx);
1705            }
1706        } else if had_provisional {
1707            cx.emit(AcpThreadEvent::TitleUpdated);
1708        }
1709        Task::ready(Ok(()))
1710    }
1711
1712    /// Sets a provisional display title without propagating back to the
1713    /// underlying agent connection. This is used for quick preview titles
1714    /// (e.g. first 20 chars of the user message) that should be shown
1715    /// immediately but replaced once the LLM generates a proper title via
1716    /// `set_title`.
1717    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1718        self.provisional_title = Some(title);
1719        cx.emit(AcpThreadEvent::TitleUpdated);
1720    }
1721
1722    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1723        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1724    }
1725
1726    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1727        self.token_usage = usage;
1728        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1729    }
1730
1731    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1732        cx.emit(AcpThreadEvent::Retry(status));
1733    }
1734
1735    pub fn update_tool_call(
1736        &mut self,
1737        update: impl Into<ToolCallUpdate>,
1738        cx: &mut Context<Self>,
1739    ) -> Result<()> {
1740        let update = update.into();
1741        let languages = self.project.read(cx).languages().clone();
1742        let path_style = self.project.read(cx).path_style(cx);
1743
1744        let ix = match self.index_for_tool_call(update.id()) {
1745            Some(ix) => ix,
1746            None => {
1747                // Tool call not found - create a failed tool call entry
1748                let failed_tool_call = ToolCall {
1749                    id: update.id().clone(),
1750                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1751                    kind: acp::ToolKind::Fetch,
1752                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1753                        "Tool call not found".into(),
1754                        &languages,
1755                        path_style,
1756                        cx,
1757                    ))],
1758                    status: ToolCallStatus::Failed,
1759                    locations: Vec::new(),
1760                    resolved_locations: Vec::new(),
1761                    raw_input: None,
1762                    raw_input_markdown: None,
1763                    raw_output: None,
1764                    tool_name: None,
1765                    subagent_session_info: None,
1766                };
1767                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1768                return Ok(());
1769            }
1770        };
1771        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1772            unreachable!()
1773        };
1774
1775        match update {
1776            ToolCallUpdate::UpdateFields(update) => {
1777                let location_updated = update.fields.locations.is_some();
1778                call.update_fields(
1779                    update.fields,
1780                    update.meta,
1781                    languages,
1782                    path_style,
1783                    &self.terminals,
1784                    cx,
1785                )?;
1786                if location_updated {
1787                    self.resolve_locations(update.tool_call_id, cx);
1788                }
1789            }
1790            ToolCallUpdate::UpdateDiff(update) => {
1791                call.content.clear();
1792                call.content.push(ToolCallContent::Diff(update.diff));
1793            }
1794            ToolCallUpdate::UpdateTerminal(update) => {
1795                call.content.clear();
1796                call.content
1797                    .push(ToolCallContent::Terminal(update.terminal));
1798            }
1799        }
1800
1801        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1802
1803        Ok(())
1804    }
1805
1806    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1807    pub fn upsert_tool_call(
1808        &mut self,
1809        tool_call: acp::ToolCall,
1810        cx: &mut Context<Self>,
1811    ) -> Result<(), acp::Error> {
1812        let status = tool_call.status.into();
1813        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1814    }
1815
1816    /// Fails if id does not match an existing entry.
1817    pub fn upsert_tool_call_inner(
1818        &mut self,
1819        update: acp::ToolCallUpdate,
1820        status: ToolCallStatus,
1821        cx: &mut Context<Self>,
1822    ) -> Result<(), acp::Error> {
1823        let language_registry = self.project.read(cx).languages().clone();
1824        let path_style = self.project.read(cx).path_style(cx);
1825        let id = update.tool_call_id.clone();
1826
1827        let agent_telemetry_id = self.connection().telemetry_id();
1828        let session = self.session_id();
1829        let parent_session_id = self.parent_session_id();
1830        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1831            let status = if matches!(status, ToolCallStatus::Completed) {
1832                "completed"
1833            } else {
1834                "failed"
1835            };
1836            telemetry::event!(
1837                "Agent Tool Call Completed",
1838                agent_telemetry_id,
1839                session,
1840                parent_session_id,
1841                status
1842            );
1843        }
1844
1845        if let Some(ix) = self.index_for_tool_call(&id) {
1846            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1847                unreachable!()
1848            };
1849
1850            call.update_fields(
1851                update.fields,
1852                update.meta,
1853                language_registry,
1854                path_style,
1855                &self.terminals,
1856                cx,
1857            )?;
1858            call.status = status;
1859
1860            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1861        } else {
1862            let call = ToolCall::from_acp(
1863                update.try_into()?,
1864                status,
1865                language_registry,
1866                self.project.read(cx).path_style(cx),
1867                &self.terminals,
1868                cx,
1869            )?;
1870            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1871        };
1872
1873        self.resolve_locations(id, cx);
1874        Ok(())
1875    }
1876
1877    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1878        self.entries
1879            .iter()
1880            .enumerate()
1881            .rev()
1882            .find_map(|(index, entry)| {
1883                if let AgentThreadEntry::ToolCall(tool_call) = entry
1884                    && &tool_call.id == id
1885                {
1886                    Some(index)
1887                } else {
1888                    None
1889                }
1890            })
1891    }
1892
1893    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1894        // The tool call we are looking for is typically the last one, or very close to the end.
1895        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1896        self.entries
1897            .iter_mut()
1898            .enumerate()
1899            .rev()
1900            .find_map(|(index, tool_call)| {
1901                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1902                    && &tool_call.id == id
1903                {
1904                    Some((index, tool_call))
1905                } else {
1906                    None
1907                }
1908            })
1909    }
1910
1911    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1912        self.entries
1913            .iter()
1914            .enumerate()
1915            .rev()
1916            .find_map(|(index, tool_call)| {
1917                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1918                    && &tool_call.id == id
1919                {
1920                    Some((index, tool_call))
1921                } else {
1922                    None
1923                }
1924            })
1925    }
1926
1927    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1928        self.entries.iter().find_map(|entry| match entry {
1929            AgentThreadEntry::ToolCall(tool_call) => {
1930                if let Some(subagent_session_info) = &tool_call.subagent_session_info
1931                    && &subagent_session_info.session_id == session_id
1932                {
1933                    Some(tool_call)
1934                } else {
1935                    None
1936                }
1937            }
1938            _ => None,
1939        })
1940    }
1941
1942    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1943        let project = self.project.clone();
1944        let should_update_agent_location = self.parent_session_id.is_none();
1945        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1946            return;
1947        };
1948        let task = tool_call.resolve_locations(project, cx);
1949        cx.spawn(async move |this, cx| {
1950            let resolved_locations = task.await;
1951
1952            this.update(cx, |this, cx| {
1953                let project = this.project.clone();
1954
1955                for location in resolved_locations.iter().flatten() {
1956                    this.shared_buffers
1957                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1958                }
1959                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1960                    return;
1961                };
1962
1963                if let Some(Some(location)) = resolved_locations.last() {
1964                    project.update(cx, |project, cx| {
1965                        let should_ignore = if let Some(agent_location) = project
1966                            .agent_location()
1967                            .filter(|agent_location| agent_location.buffer == location.buffer)
1968                        {
1969                            let snapshot = location.buffer.read(cx).snapshot();
1970                            let old_position = agent_location.position.to_point(&snapshot);
1971                            let new_position = location.position.to_point(&snapshot);
1972
1973                            // ignore this so that when we get updates from the edit tool
1974                            // the position doesn't reset to the startof line
1975                            old_position.row == new_position.row
1976                                && old_position.column > new_position.column
1977                        } else {
1978                            false
1979                        };
1980                        if !should_ignore && should_update_agent_location {
1981                            project.set_agent_location(Some(location.into()), cx);
1982                        }
1983                    });
1984                }
1985
1986                let resolved_locations = resolved_locations
1987                    .iter()
1988                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1989                    .collect::<Vec<_>>();
1990
1991                if tool_call.resolved_locations != resolved_locations {
1992                    tool_call.resolved_locations = resolved_locations;
1993                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1994                }
1995            })
1996        })
1997        .detach();
1998    }
1999
2000    pub fn request_tool_call_authorization(
2001        &mut self,
2002        tool_call: acp::ToolCallUpdate,
2003        options: PermissionOptions,
2004        cx: &mut Context<Self>,
2005    ) -> Result<Task<RequestPermissionOutcome>> {
2006        let (tx, rx) = oneshot::channel();
2007
2008        let status = ToolCallStatus::WaitingForConfirmation {
2009            options,
2010            respond_tx: tx,
2011        };
2012
2013        let tool_call_id = tool_call.tool_call_id.clone();
2014        self.upsert_tool_call_inner(tool_call, status, cx)?;
2015        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2016            tool_call_id.clone(),
2017        ));
2018
2019        Ok(cx.spawn(async move |this, cx| {
2020            let outcome = match rx.await {
2021                Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2022                Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2023            };
2024            this.update(cx, |_this, cx| {
2025                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2026            })
2027            .ok();
2028            outcome
2029        }))
2030    }
2031
2032    pub fn authorize_tool_call(
2033        &mut self,
2034        id: acp::ToolCallId,
2035        outcome: SelectedPermissionOutcome,
2036        cx: &mut Context<Self>,
2037    ) {
2038        let Some((ix, call)) = self.tool_call_mut(&id) else {
2039            return;
2040        };
2041
2042        let new_status = match outcome.option_kind {
2043            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2044                ToolCallStatus::Rejected
2045            }
2046            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2047                ToolCallStatus::InProgress
2048            }
2049            _ => ToolCallStatus::InProgress,
2050        };
2051
2052        let curr_status = mem::replace(&mut call.status, new_status);
2053
2054        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2055            respond_tx.send(outcome).log_err();
2056        } else if cfg!(debug_assertions) {
2057            panic!("tried to authorize an already authorized tool call");
2058        }
2059
2060        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2061    }
2062
2063    pub fn plan(&self) -> &Plan {
2064        &self.plan
2065    }
2066
2067    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2068        let new_entries_len = request.entries.len();
2069        let mut new_entries = request.entries.into_iter();
2070
2071        // Reuse existing markdown to prevent flickering
2072        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2073            let PlanEntry {
2074                content,
2075                priority,
2076                status,
2077            } = old;
2078            content.update(cx, |old, cx| {
2079                old.replace(new.content, cx);
2080            });
2081            *priority = new.priority;
2082            *status = new.status;
2083        }
2084        for new in new_entries {
2085            self.plan.entries.push(PlanEntry::from_acp(new, cx))
2086        }
2087        self.plan.entries.truncate(new_entries_len);
2088
2089        cx.notify();
2090    }
2091
2092    pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2093        if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2094            let completed_entries = std::mem::take(&mut self.plan.entries);
2095            self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2096        }
2097    }
2098
2099    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2100        self.plan
2101            .entries
2102            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2103        cx.notify();
2104    }
2105
2106    pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2107        self.plan.entries.clear();
2108        cx.notify();
2109    }
2110
2111    #[cfg(any(test, feature = "test-support"))]
2112    pub fn send_raw(
2113        &mut self,
2114        message: &str,
2115        cx: &mut Context<Self>,
2116    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2117        self.send(vec![message.into()], cx)
2118    }
2119
2120    pub fn send(
2121        &mut self,
2122        message: Vec<acp::ContentBlock>,
2123        cx: &mut Context<Self>,
2124    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2125        let block = ContentBlock::new_combined(
2126            message.clone(),
2127            self.project.read(cx).languages().clone(),
2128            self.project.read(cx).path_style(cx),
2129            cx,
2130        );
2131        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2132        let git_store = self.project.read(cx).git_store().clone();
2133
2134        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2135            Some(UserMessageId::new())
2136        } else {
2137            None
2138        };
2139
2140        self.run_turn(cx, async move |this, cx| {
2141            this.update(cx, |this, cx| {
2142                this.push_entry(
2143                    AgentThreadEntry::UserMessage(UserMessage {
2144                        id: message_id.clone(),
2145                        content: block,
2146                        chunks: message,
2147                        checkpoint: None,
2148                        indented: false,
2149                    }),
2150                    cx,
2151                );
2152            })
2153            .ok();
2154
2155            let old_checkpoint = git_store
2156                .update(cx, |git, cx| git.checkpoint(cx))
2157                .await
2158                .context("failed to get old checkpoint")
2159                .log_err();
2160            this.update(cx, |this, cx| {
2161                if let Some((_ix, message)) = this.last_user_message() {
2162                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2163                        git_checkpoint,
2164                        show: false,
2165                    });
2166                }
2167                this.connection.prompt(message_id, request, cx)
2168            })?
2169            .await
2170        })
2171    }
2172
2173    pub fn can_retry(&self, cx: &App) -> bool {
2174        self.connection.retry(&self.session_id, cx).is_some()
2175    }
2176
2177    pub fn retry(
2178        &mut self,
2179        cx: &mut Context<Self>,
2180    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2181        self.run_turn(cx, async move |this, cx| {
2182            this.update(cx, |this, cx| {
2183                this.connection
2184                    .retry(&this.session_id, cx)
2185                    .map(|retry| retry.run(cx))
2186            })?
2187            .context("retrying a session is not supported")?
2188            .await
2189        })
2190    }
2191
2192    fn run_turn(
2193        &mut self,
2194        cx: &mut Context<Self>,
2195        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2196    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2197        self.clear_completed_plan_entries(cx);
2198        self.had_error = false;
2199
2200        let (tx, rx) = oneshot::channel();
2201        let cancel_task = self.cancel(cx);
2202
2203        self.turn_id += 1;
2204        let turn_id = self.turn_id;
2205        self.running_turn = Some(RunningTurn {
2206            id: turn_id,
2207            send_task: cx.spawn(async move |this, cx| {
2208                cancel_task.await;
2209                tx.send(f(this, cx).await).ok();
2210            }),
2211        });
2212
2213        cx.spawn(async move |this, cx| {
2214            let response = rx.await;
2215
2216            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2217                .await?;
2218
2219            this.update(cx, |this, cx| {
2220                if this.parent_session_id.is_none() {
2221                    this.project
2222                        .update(cx, |project, cx| project.set_agent_location(None, cx));
2223                }
2224                let Ok(response) = response else {
2225                    // tx dropped, just return
2226                    return Ok(None);
2227                };
2228
2229                let is_same_turn = this
2230                    .running_turn
2231                    .as_ref()
2232                    .is_some_and(|turn| turn_id == turn.id);
2233
2234                // If the user submitted a follow up message, running_turn might
2235                // already point to a different turn. Therefore we only want to
2236                // take the task if it's the same turn.
2237                if is_same_turn {
2238                    this.running_turn.take();
2239                }
2240
2241                match response {
2242                    Ok(r) => {
2243                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2244
2245                        if r.stop_reason == acp::StopReason::MaxTokens {
2246                            this.had_error = true;
2247                            cx.emit(AcpThreadEvent::Error);
2248                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2249
2250                            let exceeded_max_output_tokens =
2251                                this.token_usage.as_ref().is_some_and(|u| {
2252                                    u.max_output_tokens
2253                                        .is_some_and(|max| u.output_tokens >= max)
2254                                });
2255
2256                            let message = if exceeded_max_output_tokens {
2257                                log::error!(
2258                                    "Max output tokens reached. Usage: {:?}",
2259                                    this.token_usage
2260                                );
2261                                "Maximum output tokens reached"
2262                            } else {
2263                                log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2264                                "Maximum tokens reached"
2265                            };
2266                            return Err(anyhow!(message));
2267                        }
2268
2269                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2270                        if canceled {
2271                            this.mark_pending_tools_as_canceled();
2272                        }
2273
2274                        if !canceled {
2275                            this.snapshot_completed_plan(cx);
2276                        }
2277
2278                        // Handle refusal - distinguish between user prompt and tool call refusals
2279                        if let acp::StopReason::Refusal = r.stop_reason {
2280                            this.had_error = true;
2281                            if let Some((user_msg_ix, _)) = this.last_user_message() {
2282                                // Check if there's a completed tool call with results after the last user message
2283                                // This indicates the refusal is in response to tool output, not the user's prompt
2284                                let has_completed_tool_call_after_user_msg =
2285                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2286                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2287                                            // Check if the tool call has completed and has output
2288                                            matches!(tool_call.status, ToolCallStatus::Completed)
2289                                                && tool_call.raw_output.is_some()
2290                                        } else {
2291                                            false
2292                                        }
2293                                    });
2294
2295                                if has_completed_tool_call_after_user_msg {
2296                                    // Refusal is due to tool output - don't truncate, just notify
2297                                    // The model refused based on what the tool returned
2298                                    cx.emit(AcpThreadEvent::Refusal);
2299                                } else {
2300                                    // User prompt was refused - truncate back to before the user message
2301                                    let range = user_msg_ix..this.entries.len();
2302                                    if range.start < range.end {
2303                                        this.entries.truncate(user_msg_ix);
2304                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2305                                    }
2306                                    cx.emit(AcpThreadEvent::Refusal);
2307                                }
2308                            } else {
2309                                // No user message found, treat as general refusal
2310                                cx.emit(AcpThreadEvent::Refusal);
2311                            }
2312                        }
2313
2314                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2315                        Ok(Some(r))
2316                    }
2317                    Err(e) => {
2318                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2319
2320                        this.had_error = true;
2321                        cx.emit(AcpThreadEvent::Error);
2322                        log::error!("Error in run turn: {:?}", e);
2323                        Err(e)
2324                    }
2325                }
2326            })?
2327        })
2328        .boxed()
2329    }
2330
2331    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2332        let Some(turn) = self.running_turn.take() else {
2333            return Task::ready(());
2334        };
2335        self.connection.cancel(&self.session_id, cx);
2336
2337        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2338        self.mark_pending_tools_as_canceled();
2339
2340        // Wait for the send task to complete
2341        cx.background_spawn(turn.send_task)
2342    }
2343
2344    fn mark_pending_tools_as_canceled(&mut self) {
2345        for entry in self.entries.iter_mut() {
2346            if let AgentThreadEntry::ToolCall(call) = entry {
2347                let cancel = matches!(
2348                    call.status,
2349                    ToolCallStatus::Pending
2350                        | ToolCallStatus::WaitingForConfirmation { .. }
2351                        | ToolCallStatus::InProgress
2352                );
2353
2354                if cancel {
2355                    call.status = ToolCallStatus::Canceled;
2356                }
2357            }
2358        }
2359    }
2360
2361    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2362    pub fn restore_checkpoint(
2363        &mut self,
2364        id: UserMessageId,
2365        cx: &mut Context<Self>,
2366    ) -> Task<Result<()>> {
2367        let Some((_, message)) = self.user_message_mut(&id) else {
2368            return Task::ready(Err(anyhow!("message not found")));
2369        };
2370
2371        let checkpoint = message
2372            .checkpoint
2373            .as_ref()
2374            .map(|c| c.git_checkpoint.clone());
2375
2376        // Cancel any in-progress generation before restoring
2377        let cancel_task = self.cancel(cx);
2378        let rewind = self.rewind(id.clone(), cx);
2379        let git_store = self.project.read(cx).git_store().clone();
2380
2381        cx.spawn(async move |_, cx| {
2382            cancel_task.await;
2383            rewind.await?;
2384            if let Some(checkpoint) = checkpoint {
2385                git_store
2386                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2387                    .await?;
2388            }
2389
2390            Ok(())
2391        })
2392    }
2393
2394    /// Rewinds this thread to before the entry at `index`, removing it and all
2395    /// subsequent entries while rejecting any action_log changes made from that point.
2396    /// Unlike `restore_checkpoint`, this method does not restore from git.
2397    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2398        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2399            return Task::ready(Err(anyhow!("not supported")));
2400        };
2401
2402        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2403        let telemetry = ActionLogTelemetry::from(&*self);
2404        cx.spawn(async move |this, cx| {
2405            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2406            this.update(cx, |this, cx| {
2407                if let Some((ix, _)) = this.user_message_mut(&id) {
2408                    // Collect all terminals from entries that will be removed
2409                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2410                        .iter()
2411                        .flat_map(|entry| entry.terminals())
2412                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2413                        .collect();
2414
2415                    let range = ix..this.entries.len();
2416                    this.entries.truncate(ix);
2417                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2418
2419                    // Kill and remove the terminals
2420                    for terminal_id in terminals_to_remove {
2421                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2422                            terminal.update(cx, |terminal, cx| {
2423                                terminal.kill(cx);
2424                            });
2425                        }
2426                    }
2427                }
2428                this.action_log().update(cx, |action_log, cx| {
2429                    action_log.reject_all_edits(Some(telemetry), cx)
2430                })
2431            })?
2432            .await;
2433            Ok(())
2434        })
2435    }
2436
2437    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2438        let git_store = self.project.read(cx).git_store().clone();
2439
2440        let Some((_, message)) = self.last_user_message() else {
2441            return Task::ready(Ok(()));
2442        };
2443        let Some(user_message_id) = message.id.clone() else {
2444            return Task::ready(Ok(()));
2445        };
2446        let Some(checkpoint) = message.checkpoint.as_ref() else {
2447            return Task::ready(Ok(()));
2448        };
2449        let old_checkpoint = checkpoint.git_checkpoint.clone();
2450
2451        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2452        cx.spawn(async move |this, cx| {
2453            let Some(new_checkpoint) = new_checkpoint
2454                .await
2455                .context("failed to get new checkpoint")
2456                .log_err()
2457            else {
2458                return Ok(());
2459            };
2460
2461            let equal = git_store
2462                .update(cx, |git, cx| {
2463                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2464                })
2465                .await
2466                .unwrap_or(true);
2467
2468            this.update(cx, |this, cx| {
2469                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2470                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2471                        checkpoint.show = !equal;
2472                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2473                    }
2474                }
2475            })?;
2476
2477            Ok(())
2478        })
2479    }
2480
2481    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2482        self.entries
2483            .iter_mut()
2484            .enumerate()
2485            .rev()
2486            .find_map(|(ix, entry)| {
2487                if let AgentThreadEntry::UserMessage(message) = entry {
2488                    Some((ix, message))
2489                } else {
2490                    None
2491                }
2492            })
2493    }
2494
2495    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2496        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2497            if let AgentThreadEntry::UserMessage(message) = entry {
2498                if message.id.as_ref() == Some(id) {
2499                    Some((ix, message))
2500                } else {
2501                    None
2502                }
2503            } else {
2504                None
2505            }
2506        })
2507    }
2508
2509    pub fn read_text_file(
2510        &self,
2511        path: PathBuf,
2512        line: Option<u32>,
2513        limit: Option<u32>,
2514        reuse_shared_snapshot: bool,
2515        cx: &mut Context<Self>,
2516    ) -> Task<Result<String, acp::Error>> {
2517        // Args are 1-based, move to 0-based
2518        let line = line.unwrap_or_default().saturating_sub(1);
2519        let limit = limit.unwrap_or(u32::MAX);
2520        let project = self.project.clone();
2521        let action_log = self.action_log.clone();
2522        let should_update_agent_location = self.parent_session_id.is_none();
2523        cx.spawn(async move |this, cx| {
2524            let load = project.update(cx, |project, cx| {
2525                let path = project
2526                    .project_path_for_absolute_path(&path, cx)
2527                    .ok_or_else(|| {
2528                        acp::Error::resource_not_found(Some(path.display().to_string()))
2529                    })?;
2530                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2531            })?;
2532
2533            let buffer = load.await?;
2534
2535            let snapshot = if reuse_shared_snapshot {
2536                this.read_with(cx, |this, _| {
2537                    this.shared_buffers.get(&buffer.clone()).cloned()
2538                })
2539                .log_err()
2540                .flatten()
2541            } else {
2542                None
2543            };
2544
2545            let snapshot = if let Some(snapshot) = snapshot {
2546                snapshot
2547            } else {
2548                action_log.update(cx, |action_log, cx| {
2549                    action_log.buffer_read(buffer.clone(), cx);
2550                });
2551
2552                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2553                this.update(cx, |this, _| {
2554                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2555                })?;
2556                snapshot
2557            };
2558
2559            let max_point = snapshot.max_point();
2560            let start_position = Point::new(line, 0);
2561
2562            if start_position > max_point {
2563                return Err(acp::Error::invalid_params().data(format!(
2564                    "Attempting to read beyond the end of the file, line {}:{}",
2565                    max_point.row + 1,
2566                    max_point.column
2567                )));
2568            }
2569
2570            let start = snapshot.anchor_before(start_position);
2571            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2572
2573            if should_update_agent_location {
2574                project.update(cx, |project, cx| {
2575                    project.set_agent_location(
2576                        Some(AgentLocation {
2577                            buffer: buffer.downgrade(),
2578                            position: start,
2579                        }),
2580                        cx,
2581                    );
2582                });
2583            }
2584
2585            Ok(snapshot.text_for_range(start..end).collect::<String>())
2586        })
2587    }
2588
2589    pub fn write_text_file(
2590        &self,
2591        path: PathBuf,
2592        content: String,
2593        cx: &mut Context<Self>,
2594    ) -> Task<Result<()>> {
2595        let project = self.project.clone();
2596        let action_log = self.action_log.clone();
2597        let should_update_agent_location = self.parent_session_id.is_none();
2598        cx.spawn(async move |this, cx| {
2599            let load = project.update(cx, |project, cx| {
2600                let path = project
2601                    .project_path_for_absolute_path(&path, cx)
2602                    .context("invalid path")?;
2603                anyhow::Ok(project.open_buffer(path, cx))
2604            });
2605            let buffer = load?.await?;
2606            let snapshot = this.update(cx, |this, cx| {
2607                this.shared_buffers
2608                    .get(&buffer)
2609                    .cloned()
2610                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2611            })?;
2612            let edits = cx
2613                .background_executor()
2614                .spawn(async move {
2615                    let old_text = snapshot.text();
2616                    text_diff(old_text.as_str(), &content)
2617                        .into_iter()
2618                        .map(|(range, replacement)| {
2619                            (snapshot.anchor_range_around(range), replacement)
2620                        })
2621                        .collect::<Vec<_>>()
2622                })
2623                .await;
2624
2625            if should_update_agent_location {
2626                project.update(cx, |project, cx| {
2627                    project.set_agent_location(
2628                        Some(AgentLocation {
2629                            buffer: buffer.downgrade(),
2630                            position: edits
2631                                .last()
2632                                .map(|(range, _)| range.end)
2633                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2634                        }),
2635                        cx,
2636                    );
2637                });
2638            }
2639
2640            let format_on_save = cx.update(|cx| {
2641                action_log.update(cx, |action_log, cx| {
2642                    action_log.buffer_read(buffer.clone(), cx);
2643                });
2644
2645                let format_on_save = buffer.update(cx, |buffer, cx| {
2646                    buffer.edit(edits, None, cx);
2647
2648                    let settings =
2649                        language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2650
2651                    settings.format_on_save != FormatOnSave::Off
2652                });
2653                action_log.update(cx, |action_log, cx| {
2654                    action_log.buffer_edited(buffer.clone(), cx);
2655                });
2656                format_on_save
2657            });
2658
2659            if format_on_save {
2660                let format_task = project.update(cx, |project, cx| {
2661                    project.format(
2662                        HashSet::from_iter([buffer.clone()]),
2663                        LspFormatTarget::Buffers,
2664                        false,
2665                        FormatTrigger::Save,
2666                        cx,
2667                    )
2668                });
2669                format_task.await.log_err();
2670
2671                action_log.update(cx, |action_log, cx| {
2672                    action_log.buffer_edited(buffer.clone(), cx);
2673                });
2674            }
2675
2676            project
2677                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2678                .await
2679        })
2680    }
2681
2682    pub fn create_terminal(
2683        &self,
2684        command: String,
2685        args: Vec<String>,
2686        extra_env: Vec<acp::EnvVariable>,
2687        cwd: Option<PathBuf>,
2688        output_byte_limit: Option<u64>,
2689        cx: &mut Context<Self>,
2690    ) -> Task<Result<Entity<Terminal>>> {
2691        let env = match &cwd {
2692            Some(dir) => self.project.update(cx, |project, cx| {
2693                project.environment().update(cx, |env, cx| {
2694                    env.directory_environment(dir.as_path().into(), cx)
2695                })
2696            }),
2697            None => Task::ready(None).shared(),
2698        };
2699        let env = cx.spawn(async move |_, _| {
2700            let mut env = env.await.unwrap_or_default();
2701            // Disables paging for `git` and hopefully other commands
2702            env.insert("PAGER".into(), "".into());
2703            for var in extra_env {
2704                env.insert(var.name, var.value);
2705            }
2706            env
2707        });
2708
2709        let project = self.project.clone();
2710        let language_registry = project.read(cx).languages().clone();
2711        let is_windows = project.read(cx).path_style(cx).is_windows();
2712
2713        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2714        let terminal_task = cx.spawn({
2715            let terminal_id = terminal_id.clone();
2716            async move |_this, cx| {
2717                let env = env.await;
2718                let shell = project
2719                    .update(cx, |project, cx| {
2720                        project
2721                            .remote_client()
2722                            .and_then(|r| r.read(cx).default_system_shell())
2723                    })
2724                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2725                let (task_command, task_args) =
2726                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2727                        .redirect_stdin_to_dev_null()
2728                        .build(Some(command.clone()), &args);
2729                let terminal = project
2730                    .update(cx, |project, cx| {
2731                        project.create_terminal_task(
2732                            task::SpawnInTerminal {
2733                                command: Some(task_command),
2734                                args: task_args,
2735                                cwd: cwd.clone(),
2736                                env,
2737                                ..Default::default()
2738                            },
2739                            cx,
2740                        )
2741                    })
2742                    .await?;
2743
2744                anyhow::Ok(cx.new(|cx| {
2745                    Terminal::new(
2746                        terminal_id,
2747                        &format!("{} {}", command, args.join(" ")),
2748                        cwd,
2749                        output_byte_limit.map(|l| l as usize),
2750                        terminal,
2751                        language_registry,
2752                        cx,
2753                    )
2754                }))
2755            }
2756        });
2757
2758        cx.spawn(async move |this, cx| {
2759            let terminal = terminal_task.await?;
2760            this.update(cx, |this, _cx| {
2761                this.terminals.insert(terminal_id, terminal.clone());
2762                terminal
2763            })
2764        })
2765    }
2766
2767    pub fn kill_terminal(
2768        &mut self,
2769        terminal_id: acp::TerminalId,
2770        cx: &mut Context<Self>,
2771    ) -> Result<()> {
2772        self.terminals
2773            .get(&terminal_id)
2774            .context("Terminal not found")?
2775            .update(cx, |terminal, cx| {
2776                terminal.kill(cx);
2777            });
2778
2779        Ok(())
2780    }
2781
2782    pub fn release_terminal(
2783        &mut self,
2784        terminal_id: acp::TerminalId,
2785        cx: &mut Context<Self>,
2786    ) -> Result<()> {
2787        self.terminals
2788            .remove(&terminal_id)
2789            .context("Terminal not found")?
2790            .update(cx, |terminal, cx| {
2791                terminal.kill(cx);
2792            });
2793
2794        Ok(())
2795    }
2796
2797    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2798        self.terminals
2799            .get(&terminal_id)
2800            .context("Terminal not found")
2801            .cloned()
2802    }
2803
2804    pub fn to_markdown(&self, cx: &App) -> String {
2805        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2806    }
2807
2808    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2809        cx.emit(AcpThreadEvent::LoadError(error));
2810    }
2811
2812    pub fn register_terminal_created(
2813        &mut self,
2814        terminal_id: acp::TerminalId,
2815        command_label: String,
2816        working_dir: Option<PathBuf>,
2817        output_byte_limit: Option<u64>,
2818        terminal: Entity<::terminal::Terminal>,
2819        cx: &mut Context<Self>,
2820    ) -> Entity<Terminal> {
2821        let language_registry = self.project.read(cx).languages().clone();
2822
2823        let entity = cx.new(|cx| {
2824            Terminal::new(
2825                terminal_id.clone(),
2826                &command_label,
2827                working_dir.clone(),
2828                output_byte_limit.map(|l| l as usize),
2829                terminal,
2830                language_registry,
2831                cx,
2832            )
2833        });
2834        self.terminals.insert(terminal_id.clone(), entity.clone());
2835        entity
2836    }
2837
2838    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2839        for entry in self.entries.iter_mut().rev() {
2840            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2841                assistant_message.is_subagent_output = true;
2842                cx.notify();
2843                return;
2844            }
2845        }
2846    }
2847
2848    pub fn on_terminal_provider_event(
2849        &mut self,
2850        event: TerminalProviderEvent,
2851        cx: &mut Context<Self>,
2852    ) {
2853        match event {
2854            TerminalProviderEvent::Created {
2855                terminal_id,
2856                label,
2857                cwd,
2858                output_byte_limit,
2859                terminal,
2860            } => {
2861                let entity = self.register_terminal_created(
2862                    terminal_id.clone(),
2863                    label,
2864                    cwd,
2865                    output_byte_limit,
2866                    terminal,
2867                    cx,
2868                );
2869
2870                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2871                    for data in chunks.drain(..) {
2872                        entity.update(cx, |term, cx| {
2873                            term.inner().update(cx, |inner, cx| {
2874                                inner.write_output(&data, cx);
2875                            })
2876                        });
2877                    }
2878                }
2879
2880                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2881                    entity.update(cx, |_term, cx| {
2882                        cx.notify();
2883                    });
2884                }
2885
2886                cx.notify();
2887            }
2888            TerminalProviderEvent::Output { terminal_id, data } => {
2889                if let Some(entity) = self.terminals.get(&terminal_id) {
2890                    entity.update(cx, |term, cx| {
2891                        term.inner().update(cx, |inner, cx| {
2892                            inner.write_output(&data, cx);
2893                        })
2894                    });
2895                } else {
2896                    self.pending_terminal_output
2897                        .entry(terminal_id)
2898                        .or_default()
2899                        .push(data);
2900                }
2901            }
2902            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2903                if let Some(entity) = self.terminals.get(&terminal_id) {
2904                    entity.update(cx, |term, cx| {
2905                        term.inner().update(cx, |inner, cx| {
2906                            inner.breadcrumb_text = title;
2907                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2908                        })
2909                    });
2910                }
2911            }
2912            TerminalProviderEvent::Exit {
2913                terminal_id,
2914                status,
2915            } => {
2916                if let Some(entity) = self.terminals.get(&terminal_id) {
2917                    entity.update(cx, |_term, cx| {
2918                        cx.notify();
2919                    });
2920                } else {
2921                    self.pending_terminal_exit.insert(terminal_id, status);
2922                }
2923            }
2924        }
2925    }
2926}
2927
2928fn markdown_for_raw_output(
2929    raw_output: &serde_json::Value,
2930    language_registry: &Arc<LanguageRegistry>,
2931    cx: &mut App,
2932) -> Option<Entity<Markdown>> {
2933    match raw_output {
2934        serde_json::Value::Null => None,
2935        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2936            Markdown::new(
2937                value.to_string().into(),
2938                Some(language_registry.clone()),
2939                None,
2940                cx,
2941            )
2942        })),
2943        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2944            Markdown::new(
2945                value.to_string().into(),
2946                Some(language_registry.clone()),
2947                None,
2948                cx,
2949            )
2950        })),
2951        serde_json::Value::String(value) => Some(cx.new(|cx| {
2952            Markdown::new(
2953                value.clone().into(),
2954                Some(language_registry.clone()),
2955                None,
2956                cx,
2957            )
2958        })),
2959        value => Some(cx.new(|cx| {
2960            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2961
2962            Markdown::new(
2963                format!("```json\n{}\n```", pretty_json).into(),
2964                Some(language_registry.clone()),
2965                None,
2966                cx,
2967            )
2968        })),
2969    }
2970}
2971
2972#[cfg(test)]
2973mod tests {
2974    use super::*;
2975    use anyhow::anyhow;
2976    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2977    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2978    use indoc::indoc;
2979    use project::{AgentId, FakeFs, Fs};
2980    use rand::{distr, prelude::*};
2981    use serde_json::json;
2982    use settings::SettingsStore;
2983    use smol::stream::StreamExt as _;
2984    use std::{
2985        any::Any,
2986        cell::RefCell,
2987        path::Path,
2988        rc::Rc,
2989        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2990        time::Duration,
2991    };
2992    use util::{path, path_list::PathList};
2993
2994    fn init_test(cx: &mut TestAppContext) {
2995        env_logger::try_init().ok();
2996        cx.update(|cx| {
2997            let settings_store = SettingsStore::test(cx);
2998            cx.set_global(settings_store);
2999        });
3000    }
3001
3002    #[gpui::test]
3003    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3004        init_test(cx);
3005
3006        let fs = FakeFs::new(cx.executor());
3007        let project = Project::test(fs, [], cx).await;
3008        let connection = Rc::new(FakeAgentConnection::new());
3009        let thread = cx
3010            .update(|cx| {
3011                connection.new_session(
3012                    project,
3013                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3014                    cx,
3015                )
3016            })
3017            .await
3018            .unwrap();
3019
3020        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3021
3022        // Send Output BEFORE Created - should be buffered by acp_thread
3023        thread.update(cx, |thread, cx| {
3024            thread.on_terminal_provider_event(
3025                TerminalProviderEvent::Output {
3026                    terminal_id: terminal_id.clone(),
3027                    data: b"hello buffered".to_vec(),
3028                },
3029                cx,
3030            );
3031        });
3032
3033        // Create a display-only terminal and then send Created
3034        let lower = cx.new(|cx| {
3035            let builder = ::terminal::TerminalBuilder::new_display_only(
3036                ::terminal::terminal_settings::CursorShape::default(),
3037                ::terminal::terminal_settings::AlternateScroll::On,
3038                None,
3039                0,
3040                cx.background_executor(),
3041                PathStyle::local(),
3042            )
3043            .unwrap();
3044            builder.subscribe(cx)
3045        });
3046
3047        thread.update(cx, |thread, cx| {
3048            thread.on_terminal_provider_event(
3049                TerminalProviderEvent::Created {
3050                    terminal_id: terminal_id.clone(),
3051                    label: "Buffered Test".to_string(),
3052                    cwd: None,
3053                    output_byte_limit: None,
3054                    terminal: lower.clone(),
3055                },
3056                cx,
3057            );
3058        });
3059
3060        // After Created, buffered Output should have been flushed into the renderer
3061        let content = thread.read_with(cx, |thread, cx| {
3062            let term = thread.terminal(terminal_id.clone()).unwrap();
3063            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3064        });
3065
3066        assert!(
3067            content.contains("hello buffered"),
3068            "expected buffered output to render, got: {content}"
3069        );
3070    }
3071
3072    #[gpui::test]
3073    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3074        init_test(cx);
3075
3076        let fs = FakeFs::new(cx.executor());
3077        let project = Project::test(fs, [], cx).await;
3078        let connection = Rc::new(FakeAgentConnection::new());
3079        let thread = cx
3080            .update(|cx| {
3081                connection.new_session(
3082                    project,
3083                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3084                    cx,
3085                )
3086            })
3087            .await
3088            .unwrap();
3089
3090        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3091
3092        // Send Output BEFORE Created
3093        thread.update(cx, |thread, cx| {
3094            thread.on_terminal_provider_event(
3095                TerminalProviderEvent::Output {
3096                    terminal_id: terminal_id.clone(),
3097                    data: b"pre-exit data".to_vec(),
3098                },
3099                cx,
3100            );
3101        });
3102
3103        // Send Exit BEFORE Created
3104        thread.update(cx, |thread, cx| {
3105            thread.on_terminal_provider_event(
3106                TerminalProviderEvent::Exit {
3107                    terminal_id: terminal_id.clone(),
3108                    status: acp::TerminalExitStatus::new().exit_code(0),
3109                },
3110                cx,
3111            );
3112        });
3113
3114        // Now create a display-only lower-level terminal and send Created
3115        let lower = cx.new(|cx| {
3116            let builder = ::terminal::TerminalBuilder::new_display_only(
3117                ::terminal::terminal_settings::CursorShape::default(),
3118                ::terminal::terminal_settings::AlternateScroll::On,
3119                None,
3120                0,
3121                cx.background_executor(),
3122                PathStyle::local(),
3123            )
3124            .unwrap();
3125            builder.subscribe(cx)
3126        });
3127
3128        thread.update(cx, |thread, cx| {
3129            thread.on_terminal_provider_event(
3130                TerminalProviderEvent::Created {
3131                    terminal_id: terminal_id.clone(),
3132                    label: "Buffered Exit Test".to_string(),
3133                    cwd: None,
3134                    output_byte_limit: None,
3135                    terminal: lower.clone(),
3136                },
3137                cx,
3138            );
3139        });
3140
3141        // Output should be present after Created (flushed from buffer)
3142        let content = thread.read_with(cx, |thread, cx| {
3143            let term = thread.terminal(terminal_id.clone()).unwrap();
3144            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3145        });
3146
3147        assert!(
3148            content.contains("pre-exit data"),
3149            "expected pre-exit data to render, got: {content}"
3150        );
3151    }
3152
3153    /// Test that killing a terminal via Terminal::kill properly:
3154    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3155    /// 2. The underlying terminal still has the output that was written before the kill
3156    ///
3157    /// This test verifies that the fix to kill_active_task (which now also kills
3158    /// the shell process in addition to the foreground process) properly allows
3159    /// wait_for_exit to complete instead of hanging indefinitely.
3160    #[cfg(unix)]
3161    #[gpui::test]
3162    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3163        use std::collections::HashMap;
3164        use task::Shell;
3165        use util::shell_builder::ShellBuilder;
3166
3167        init_test(cx);
3168        cx.executor().allow_parking();
3169
3170        let fs = FakeFs::new(cx.executor());
3171        let project = Project::test(fs, [], cx).await;
3172        let connection = Rc::new(FakeAgentConnection::new());
3173        let thread = cx
3174            .update(|cx| {
3175                connection.new_session(
3176                    project.clone(),
3177                    PathList::new(&[Path::new(path!("/test"))]),
3178                    cx,
3179                )
3180            })
3181            .await
3182            .unwrap();
3183
3184        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3185
3186        // Create a real PTY terminal that runs a command which prints output then sleeps
3187        // We use printf instead of echo and chain with && sleep to ensure proper execution
3188        let (completion_tx, _completion_rx) = smol::channel::unbounded();
3189        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3190            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3191            &[],
3192        );
3193
3194        let builder = cx
3195            .update(|cx| {
3196                ::terminal::TerminalBuilder::new(
3197                    None,
3198                    None,
3199                    task::Shell::WithArguments {
3200                        program,
3201                        args,
3202                        title_override: None,
3203                    },
3204                    HashMap::default(),
3205                    ::terminal::terminal_settings::CursorShape::default(),
3206                    ::terminal::terminal_settings::AlternateScroll::On,
3207                    None,
3208                    vec![],
3209                    0,
3210                    false,
3211                    0,
3212                    Some(completion_tx),
3213                    cx,
3214                    vec![],
3215                    PathStyle::local(),
3216                )
3217            })
3218            .await
3219            .unwrap();
3220
3221        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3222
3223        // Create the acp_thread Terminal wrapper
3224        thread.update(cx, |thread, cx| {
3225            thread.on_terminal_provider_event(
3226                TerminalProviderEvent::Created {
3227                    terminal_id: terminal_id.clone(),
3228                    label: "printf output_before_kill && sleep 60".to_string(),
3229                    cwd: None,
3230                    output_byte_limit: None,
3231                    terminal: lower_terminal.clone(),
3232                },
3233                cx,
3234            );
3235        });
3236
3237        // Poll until the printf command produces output, rather than using a
3238        // fixed sleep which is flaky on loaded machines.
3239        let deadline = std::time::Instant::now() + Duration::from_secs(10);
3240        loop {
3241            let has_output = thread.read_with(cx, |thread, cx| {
3242                let term = thread
3243                    .terminals
3244                    .get(&terminal_id)
3245                    .expect("terminal not found");
3246                let content = term.read(cx).inner().read(cx).get_content();
3247                content.contains("output_before_kill")
3248            });
3249            if has_output {
3250                break;
3251            }
3252            assert!(
3253                std::time::Instant::now() < deadline,
3254                "Timed out waiting for printf output to appear in terminal",
3255            );
3256            cx.executor().timer(Duration::from_millis(50)).await;
3257        }
3258
3259        // Get the acp_thread Terminal and kill it
3260        let wait_for_exit = thread.update(cx, |thread, cx| {
3261            let term = thread.terminals.get(&terminal_id).unwrap();
3262            let wait_for_exit = term.read(cx).wait_for_exit();
3263            term.update(cx, |term, cx| {
3264                term.kill(cx);
3265            });
3266            wait_for_exit
3267        });
3268
3269        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3270        // Before the fix to kill_active_task, this would hang forever because
3271        // only the foreground process was killed, not the shell, so the PTY
3272        // child never exited and wait_for_completed_task never completed.
3273        let exit_result = futures::select! {
3274            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3275            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3276        };
3277
3278        assert!(
3279            exit_result.is_some(),
3280            "wait_for_exit should complete after kill, but it timed out. \
3281            This indicates kill_active_task is not properly killing the shell process."
3282        );
3283
3284        // Give the system a chance to process any pending updates
3285        cx.run_until_parked();
3286
3287        // Verify that the underlying terminal still has the output that was
3288        // written before the kill. This verifies that killing doesn't lose output.
3289        let inner_content = thread.read_with(cx, |thread, cx| {
3290            let term = thread.terminals.get(&terminal_id).unwrap();
3291            term.read(cx).inner().read(cx).get_content()
3292        });
3293
3294        assert!(
3295            inner_content.contains("output_before_kill"),
3296            "Underlying terminal should contain output from before kill, got: {}",
3297            inner_content
3298        );
3299    }
3300
3301    #[gpui::test]
3302    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3303        init_test(cx);
3304
3305        let fs = FakeFs::new(cx.executor());
3306        let project = Project::test(fs, [], cx).await;
3307        let connection = Rc::new(FakeAgentConnection::new());
3308        let thread = cx
3309            .update(|cx| {
3310                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3311            })
3312            .await
3313            .unwrap();
3314
3315        // Test creating a new user message
3316        thread.update(cx, |thread, cx| {
3317            thread.push_user_content_block(None, "Hello, ".into(), cx);
3318        });
3319
3320        thread.update(cx, |thread, cx| {
3321            assert_eq!(thread.entries.len(), 1);
3322            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3323                assert_eq!(user_msg.id, None);
3324                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3325            } else {
3326                panic!("Expected UserMessage");
3327            }
3328        });
3329
3330        // Test appending to existing user message
3331        let message_1_id = UserMessageId::new();
3332        thread.update(cx, |thread, cx| {
3333            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3334        });
3335
3336        thread.update(cx, |thread, cx| {
3337            assert_eq!(thread.entries.len(), 1);
3338            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3339                assert_eq!(user_msg.id, Some(message_1_id));
3340                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3341            } else {
3342                panic!("Expected UserMessage");
3343            }
3344        });
3345
3346        // Test creating new user message after assistant message
3347        thread.update(cx, |thread, cx| {
3348            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3349        });
3350
3351        let message_2_id = UserMessageId::new();
3352        thread.update(cx, |thread, cx| {
3353            thread.push_user_content_block(
3354                Some(message_2_id.clone()),
3355                "New user message".into(),
3356                cx,
3357            );
3358        });
3359
3360        thread.update(cx, |thread, cx| {
3361            assert_eq!(thread.entries.len(), 3);
3362            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3363                assert_eq!(user_msg.id, Some(message_2_id));
3364                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3365            } else {
3366                panic!("Expected UserMessage at index 2");
3367            }
3368        });
3369    }
3370
3371    #[gpui::test]
3372    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3373        init_test(cx);
3374
3375        let fs = FakeFs::new(cx.executor());
3376        let project = Project::test(fs, [], cx).await;
3377        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3378            |_, thread, mut cx| {
3379                async move {
3380                    thread.update(&mut cx, |thread, cx| {
3381                        thread
3382                            .handle_session_update(
3383                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3384                                    "Thinking ".into(),
3385                                )),
3386                                cx,
3387                            )
3388                            .unwrap();
3389                        thread
3390                            .handle_session_update(
3391                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3392                                    "hard!".into(),
3393                                )),
3394                                cx,
3395                            )
3396                            .unwrap();
3397                    })?;
3398                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3399                }
3400                .boxed_local()
3401            },
3402        ));
3403
3404        let thread = cx
3405            .update(|cx| {
3406                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3407            })
3408            .await
3409            .unwrap();
3410
3411        thread
3412            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3413            .await
3414            .unwrap();
3415
3416        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3417        assert_eq!(
3418            output,
3419            indoc! {r#"
3420            ## User
3421
3422            Hello from Zed!
3423
3424            ## Assistant
3425
3426            <thinking>
3427            Thinking hard!
3428            </thinking>
3429
3430            "#}
3431        );
3432    }
3433
3434    #[gpui::test]
3435    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3436        init_test(cx);
3437
3438        let fs = FakeFs::new(cx.executor());
3439        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3440            .await;
3441        let project = Project::test(fs.clone(), [], cx).await;
3442        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3443        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3444        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3445            move |_, thread, mut cx| {
3446                let read_file_tx = read_file_tx.clone();
3447                async move {
3448                    let content = thread
3449                        .update(&mut cx, |thread, cx| {
3450                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3451                        })
3452                        .unwrap()
3453                        .await
3454                        .unwrap();
3455                    assert_eq!(content, "one\ntwo\nthree\n");
3456                    read_file_tx.take().unwrap().send(()).unwrap();
3457                    thread
3458                        .update(&mut cx, |thread, cx| {
3459                            thread.write_text_file(
3460                                path!("/tmp/foo").into(),
3461                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3462                                cx,
3463                            )
3464                        })
3465                        .unwrap()
3466                        .await
3467                        .unwrap();
3468                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3469                }
3470                .boxed_local()
3471            },
3472        ));
3473
3474        let (worktree, pathbuf) = project
3475            .update(cx, |project, cx| {
3476                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3477            })
3478            .await
3479            .unwrap();
3480        let buffer = project
3481            .update(cx, |project, cx| {
3482                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3483            })
3484            .await
3485            .unwrap();
3486
3487        let thread = cx
3488            .update(|cx| {
3489                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3490            })
3491            .await
3492            .unwrap();
3493
3494        let request = thread.update(cx, |thread, cx| {
3495            thread.send_raw("Extend the count in /tmp/foo", cx)
3496        });
3497        read_file_rx.await.ok();
3498        buffer.update(cx, |buffer, cx| {
3499            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3500        });
3501        cx.run_until_parked();
3502        assert_eq!(
3503            buffer.read_with(cx, |buffer, _| buffer.text()),
3504            "zero\none\ntwo\nthree\nfour\nfive\n"
3505        );
3506        assert_eq!(
3507            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3508            "zero\none\ntwo\nthree\nfour\nfive\n"
3509        );
3510        request.await.unwrap();
3511    }
3512
3513    #[gpui::test]
3514    async fn test_reading_from_line(cx: &mut TestAppContext) {
3515        init_test(cx);
3516
3517        let fs = FakeFs::new(cx.executor());
3518        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3519            .await;
3520        let project = Project::test(fs.clone(), [], cx).await;
3521        project
3522            .update(cx, |project, cx| {
3523                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3524            })
3525            .await
3526            .unwrap();
3527
3528        let connection = Rc::new(FakeAgentConnection::new());
3529
3530        let thread = cx
3531            .update(|cx| {
3532                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3533            })
3534            .await
3535            .unwrap();
3536
3537        // Whole file
3538        let content = thread
3539            .update(cx, |thread, cx| {
3540                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3541            })
3542            .await
3543            .unwrap();
3544
3545        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3546
3547        // Only start line
3548        let content = thread
3549            .update(cx, |thread, cx| {
3550                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3551            })
3552            .await
3553            .unwrap();
3554
3555        assert_eq!(content, "three\nfour\n");
3556
3557        // Only limit
3558        let content = thread
3559            .update(cx, |thread, cx| {
3560                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3561            })
3562            .await
3563            .unwrap();
3564
3565        assert_eq!(content, "one\ntwo\n");
3566
3567        // Range
3568        let content = thread
3569            .update(cx, |thread, cx| {
3570                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3571            })
3572            .await
3573            .unwrap();
3574
3575        assert_eq!(content, "two\nthree\n");
3576
3577        // Invalid
3578        let err = thread
3579            .update(cx, |thread, cx| {
3580                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3581            })
3582            .await
3583            .unwrap_err();
3584
3585        assert_eq!(
3586            err.to_string(),
3587            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3588        );
3589    }
3590
3591    #[gpui::test]
3592    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3593        init_test(cx);
3594
3595        let fs = FakeFs::new(cx.executor());
3596        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3597        let project = Project::test(fs.clone(), [], cx).await;
3598        project
3599            .update(cx, |project, cx| {
3600                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3601            })
3602            .await
3603            .unwrap();
3604
3605        let connection = Rc::new(FakeAgentConnection::new());
3606
3607        let thread = cx
3608            .update(|cx| {
3609                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3610            })
3611            .await
3612            .unwrap();
3613
3614        // Whole file
3615        let content = thread
3616            .update(cx, |thread, cx| {
3617                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3618            })
3619            .await
3620            .unwrap();
3621
3622        assert_eq!(content, "");
3623
3624        // Only start line
3625        let content = thread
3626            .update(cx, |thread, cx| {
3627                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3628            })
3629            .await
3630            .unwrap();
3631
3632        assert_eq!(content, "");
3633
3634        // Only limit
3635        let content = thread
3636            .update(cx, |thread, cx| {
3637                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3638            })
3639            .await
3640            .unwrap();
3641
3642        assert_eq!(content, "");
3643
3644        // Range
3645        let content = thread
3646            .update(cx, |thread, cx| {
3647                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3648            })
3649            .await
3650            .unwrap();
3651
3652        assert_eq!(content, "");
3653
3654        // Invalid
3655        let err = thread
3656            .update(cx, |thread, cx| {
3657                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3658            })
3659            .await
3660            .unwrap_err();
3661
3662        assert_eq!(
3663            err.to_string(),
3664            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3665        );
3666    }
3667    #[gpui::test]
3668    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3669        init_test(cx);
3670
3671        let fs = FakeFs::new(cx.executor());
3672        fs.insert_tree(path!("/tmp"), json!({})).await;
3673        let project = Project::test(fs.clone(), [], cx).await;
3674        project
3675            .update(cx, |project, cx| {
3676                project.find_or_create_worktree(path!("/tmp"), true, cx)
3677            })
3678            .await
3679            .unwrap();
3680
3681        let connection = Rc::new(FakeAgentConnection::new());
3682
3683        let thread = cx
3684            .update(|cx| {
3685                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3686            })
3687            .await
3688            .unwrap();
3689
3690        // Out of project file
3691        let err = thread
3692            .update(cx, |thread, cx| {
3693                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3694            })
3695            .await
3696            .unwrap_err();
3697
3698        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3699    }
3700
3701    #[gpui::test]
3702    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3703        init_test(cx);
3704
3705        let fs = FakeFs::new(cx.executor());
3706        let project = Project::test(fs, [], cx).await;
3707        let id = acp::ToolCallId::new("test");
3708
3709        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3710            let id = id.clone();
3711            move |_, thread, mut cx| {
3712                let id = id.clone();
3713                async move {
3714                    thread
3715                        .update(&mut cx, |thread, cx| {
3716                            thread.handle_session_update(
3717                                acp::SessionUpdate::ToolCall(
3718                                    acp::ToolCall::new(id.clone(), "Label")
3719                                        .kind(acp::ToolKind::Fetch)
3720                                        .status(acp::ToolCallStatus::InProgress),
3721                                ),
3722                                cx,
3723                            )
3724                        })
3725                        .unwrap()
3726                        .unwrap();
3727                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3728                }
3729                .boxed_local()
3730            }
3731        }));
3732
3733        let thread = cx
3734            .update(|cx| {
3735                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3736            })
3737            .await
3738            .unwrap();
3739
3740        let request = thread.update(cx, |thread, cx| {
3741            thread.send_raw("Fetch https://example.com", cx)
3742        });
3743
3744        run_until_first_tool_call(&thread, cx).await;
3745
3746        thread.read_with(cx, |thread, _| {
3747            assert!(matches!(
3748                thread.entries[1],
3749                AgentThreadEntry::ToolCall(ToolCall {
3750                    status: ToolCallStatus::InProgress,
3751                    ..
3752                })
3753            ));
3754        });
3755
3756        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3757
3758        thread.read_with(cx, |thread, _| {
3759            assert!(matches!(
3760                &thread.entries[1],
3761                AgentThreadEntry::ToolCall(ToolCall {
3762                    status: ToolCallStatus::Canceled,
3763                    ..
3764                })
3765            ));
3766        });
3767
3768        thread
3769            .update(cx, |thread, cx| {
3770                thread.handle_session_update(
3771                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3772                        id,
3773                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3774                    )),
3775                    cx,
3776                )
3777            })
3778            .unwrap();
3779
3780        request.await.unwrap();
3781
3782        thread.read_with(cx, |thread, _| {
3783            assert!(matches!(
3784                thread.entries[1],
3785                AgentThreadEntry::ToolCall(ToolCall {
3786                    status: ToolCallStatus::Completed,
3787                    ..
3788                })
3789            ));
3790        });
3791    }
3792
3793    #[gpui::test]
3794    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3795        init_test(cx);
3796        let fs = FakeFs::new(cx.background_executor.clone());
3797        fs.insert_tree(path!("/test"), json!({})).await;
3798        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3799
3800        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3801            move |_, thread, mut cx| {
3802                async move {
3803                    thread
3804                        .update(&mut cx, |thread, cx| {
3805                            thread.handle_session_update(
3806                                acp::SessionUpdate::ToolCall(
3807                                    acp::ToolCall::new("test", "Label")
3808                                        .kind(acp::ToolKind::Edit)
3809                                        .status(acp::ToolCallStatus::Completed)
3810                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3811                                            "/test/test.txt",
3812                                            "foo",
3813                                        ))]),
3814                                ),
3815                                cx,
3816                            )
3817                        })
3818                        .unwrap()
3819                        .unwrap();
3820                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3821                }
3822                .boxed_local()
3823            }
3824        }));
3825
3826        let thread = cx
3827            .update(|cx| {
3828                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3829            })
3830            .await
3831            .unwrap();
3832
3833        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3834            .await
3835            .unwrap();
3836
3837        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3838    }
3839
3840    #[gpui::test(iterations = 10)]
3841    async fn test_checkpoints(cx: &mut TestAppContext) {
3842        init_test(cx);
3843        let fs = FakeFs::new(cx.background_executor.clone());
3844        fs.insert_tree(
3845            path!("/test"),
3846            json!({
3847                ".git": {}
3848            }),
3849        )
3850        .await;
3851        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3852
3853        let simulate_changes = Arc::new(AtomicBool::new(true));
3854        let next_filename = Arc::new(AtomicUsize::new(0));
3855        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3856            let simulate_changes = simulate_changes.clone();
3857            let next_filename = next_filename.clone();
3858            let fs = fs.clone();
3859            move |request, thread, mut cx| {
3860                let fs = fs.clone();
3861                let simulate_changes = simulate_changes.clone();
3862                let next_filename = next_filename.clone();
3863                async move {
3864                    if simulate_changes.load(SeqCst) {
3865                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3866                        fs.write(Path::new(&filename), b"").await?;
3867                    }
3868
3869                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3870                        panic!("expected text content block");
3871                    };
3872                    thread.update(&mut cx, |thread, cx| {
3873                        thread
3874                            .handle_session_update(
3875                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3876                                    content.text.to_uppercase().into(),
3877                                )),
3878                                cx,
3879                            )
3880                            .unwrap();
3881                    })?;
3882                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3883                }
3884                .boxed_local()
3885            }
3886        }));
3887        let thread = cx
3888            .update(|cx| {
3889                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3890            })
3891            .await
3892            .unwrap();
3893
3894        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3895            .await
3896            .unwrap();
3897        thread.read_with(cx, |thread, cx| {
3898            assert_eq!(
3899                thread.to_markdown(cx),
3900                indoc! {"
3901                    ## User (checkpoint)
3902
3903                    Lorem
3904
3905                    ## Assistant
3906
3907                    LOREM
3908
3909                "}
3910            );
3911        });
3912        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3913
3914        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3915            .await
3916            .unwrap();
3917        thread.read_with(cx, |thread, cx| {
3918            assert_eq!(
3919                thread.to_markdown(cx),
3920                indoc! {"
3921                    ## User (checkpoint)
3922
3923                    Lorem
3924
3925                    ## Assistant
3926
3927                    LOREM
3928
3929                    ## User (checkpoint)
3930
3931                    ipsum
3932
3933                    ## Assistant
3934
3935                    IPSUM
3936
3937                "}
3938            );
3939        });
3940        assert_eq!(
3941            fs.files(),
3942            vec![
3943                Path::new(path!("/test/file-0")),
3944                Path::new(path!("/test/file-1"))
3945            ]
3946        );
3947
3948        // Checkpoint isn't stored when there are no changes.
3949        simulate_changes.store(false, SeqCst);
3950        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3951            .await
3952            .unwrap();
3953        thread.read_with(cx, |thread, cx| {
3954            assert_eq!(
3955                thread.to_markdown(cx),
3956                indoc! {"
3957                    ## User (checkpoint)
3958
3959                    Lorem
3960
3961                    ## Assistant
3962
3963                    LOREM
3964
3965                    ## User (checkpoint)
3966
3967                    ipsum
3968
3969                    ## Assistant
3970
3971                    IPSUM
3972
3973                    ## User
3974
3975                    dolor
3976
3977                    ## Assistant
3978
3979                    DOLOR
3980
3981                "}
3982            );
3983        });
3984        assert_eq!(
3985            fs.files(),
3986            vec![
3987                Path::new(path!("/test/file-0")),
3988                Path::new(path!("/test/file-1"))
3989            ]
3990        );
3991
3992        // Rewinding the conversation truncates the history and restores the checkpoint.
3993        thread
3994            .update(cx, |thread, cx| {
3995                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3996                    panic!("unexpected entries {:?}", thread.entries)
3997                };
3998                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3999            })
4000            .await
4001            .unwrap();
4002        thread.read_with(cx, |thread, cx| {
4003            assert_eq!(
4004                thread.to_markdown(cx),
4005                indoc! {"
4006                    ## User (checkpoint)
4007
4008                    Lorem
4009
4010                    ## Assistant
4011
4012                    LOREM
4013
4014                "}
4015            );
4016        });
4017        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4018    }
4019
4020    #[gpui::test]
4021    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4022        use std::sync::atomic::AtomicUsize;
4023        init_test(cx);
4024
4025        let fs = FakeFs::new(cx.executor());
4026        let project = Project::test(fs, None, cx).await;
4027
4028        // Create a connection that simulates refusal after tool result
4029        let prompt_count = Arc::new(AtomicUsize::new(0));
4030        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4031            let prompt_count = prompt_count.clone();
4032            move |_request, thread, mut cx| {
4033                let count = prompt_count.fetch_add(1, SeqCst);
4034                async move {
4035                    if count == 0 {
4036                        // First prompt: Generate a tool call with result
4037                        thread.update(&mut cx, |thread, cx| {
4038                            thread
4039                                .handle_session_update(
4040                                    acp::SessionUpdate::ToolCall(
4041                                        acp::ToolCall::new("tool1", "Test Tool")
4042                                            .kind(acp::ToolKind::Fetch)
4043                                            .status(acp::ToolCallStatus::Completed)
4044                                            .raw_input(serde_json::json!({"query": "test"}))
4045                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
4046                                    ),
4047                                    cx,
4048                                )
4049                                .unwrap();
4050                        })?;
4051
4052                        // Now return refusal because of the tool result
4053                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4054                    } else {
4055                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4056                    }
4057                }
4058                .boxed_local()
4059            }
4060        }));
4061
4062        let thread = cx
4063            .update(|cx| {
4064                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4065            })
4066            .await
4067            .unwrap();
4068
4069        // Track if we see a Refusal event
4070        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4071        let saw_refusal_event_captured = saw_refusal_event.clone();
4072        thread.update(cx, |_thread, cx| {
4073            cx.subscribe(
4074                &thread,
4075                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4076                    if matches!(event, AcpThreadEvent::Refusal) {
4077                        *saw_refusal_event_captured.lock().unwrap() = true;
4078                    }
4079                },
4080            )
4081            .detach();
4082        });
4083
4084        // Send a user message - this will trigger tool call and then refusal
4085        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4086        cx.background_executor.spawn(send_task).detach();
4087        cx.run_until_parked();
4088
4089        // Verify that:
4090        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4091        // 2. The user message was NOT truncated
4092        assert!(
4093            *saw_refusal_event.lock().unwrap(),
4094            "Refusal event should be emitted for tool result refusals"
4095        );
4096
4097        thread.read_with(cx, |thread, _| {
4098            let entries = thread.entries();
4099            assert!(entries.len() >= 2, "Should have user message and tool call");
4100
4101            // Verify user message is still there
4102            assert!(
4103                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4104                "User message should not be truncated"
4105            );
4106
4107            // Verify tool call is there with result
4108            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4109                assert!(
4110                    tool_call.raw_output.is_some(),
4111                    "Tool call should have output"
4112                );
4113            } else {
4114                panic!("Expected tool call at index 1");
4115            }
4116        });
4117    }
4118
4119    #[gpui::test]
4120    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4121        init_test(cx);
4122
4123        let fs = FakeFs::new(cx.executor());
4124        let project = Project::test(fs, None, cx).await;
4125
4126        let refuse_next = Arc::new(AtomicBool::new(false));
4127        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4128            let refuse_next = refuse_next.clone();
4129            move |_request, _thread, _cx| {
4130                if refuse_next.load(SeqCst) {
4131                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4132                        .boxed_local()
4133                } else {
4134                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4135                        .boxed_local()
4136                }
4137            }
4138        }));
4139
4140        let thread = cx
4141            .update(|cx| {
4142                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4143            })
4144            .await
4145            .unwrap();
4146
4147        // Track if we see a Refusal event
4148        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4149        let saw_refusal_event_captured = saw_refusal_event.clone();
4150        thread.update(cx, |_thread, cx| {
4151            cx.subscribe(
4152                &thread,
4153                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4154                    if matches!(event, AcpThreadEvent::Refusal) {
4155                        *saw_refusal_event_captured.lock().unwrap() = true;
4156                    }
4157                },
4158            )
4159            .detach();
4160        });
4161
4162        // Send a message that will be refused
4163        refuse_next.store(true, SeqCst);
4164        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4165            .await
4166            .unwrap();
4167
4168        // Verify that a Refusal event WAS emitted for user prompt refusal
4169        assert!(
4170            *saw_refusal_event.lock().unwrap(),
4171            "Refusal event should be emitted for user prompt refusals"
4172        );
4173
4174        // Verify the message was truncated (user prompt refusal)
4175        thread.read_with(cx, |thread, cx| {
4176            assert_eq!(thread.to_markdown(cx), "");
4177        });
4178    }
4179
4180    #[gpui::test]
4181    async fn test_refusal(cx: &mut TestAppContext) {
4182        init_test(cx);
4183        let fs = FakeFs::new(cx.background_executor.clone());
4184        fs.insert_tree(path!("/"), json!({})).await;
4185        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4186
4187        let refuse_next = Arc::new(AtomicBool::new(false));
4188        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4189            let refuse_next = refuse_next.clone();
4190            move |request, thread, mut cx| {
4191                let refuse_next = refuse_next.clone();
4192                async move {
4193                    if refuse_next.load(SeqCst) {
4194                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4195                    }
4196
4197                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4198                        panic!("expected text content block");
4199                    };
4200                    thread.update(&mut cx, |thread, cx| {
4201                        thread
4202                            .handle_session_update(
4203                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4204                                    content.text.to_uppercase().into(),
4205                                )),
4206                                cx,
4207                            )
4208                            .unwrap();
4209                    })?;
4210                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4211                }
4212                .boxed_local()
4213            }
4214        }));
4215        let thread = cx
4216            .update(|cx| {
4217                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4218            })
4219            .await
4220            .unwrap();
4221
4222        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4223            .await
4224            .unwrap();
4225        thread.read_with(cx, |thread, cx| {
4226            assert_eq!(
4227                thread.to_markdown(cx),
4228                indoc! {"
4229                    ## User
4230
4231                    hello
4232
4233                    ## Assistant
4234
4235                    HELLO
4236
4237                "}
4238            );
4239        });
4240
4241        // Simulate refusing the second message. The message should be truncated
4242        // when a user prompt is refused.
4243        refuse_next.store(true, SeqCst);
4244        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4245            .await
4246            .unwrap();
4247        thread.read_with(cx, |thread, cx| {
4248            assert_eq!(
4249                thread.to_markdown(cx),
4250                indoc! {"
4251                    ## User
4252
4253                    hello
4254
4255                    ## Assistant
4256
4257                    HELLO
4258
4259                "}
4260            );
4261        });
4262    }
4263
4264    async fn run_until_first_tool_call(
4265        thread: &Entity<AcpThread>,
4266        cx: &mut TestAppContext,
4267    ) -> usize {
4268        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4269
4270        let subscription = cx.update(|cx| {
4271            cx.subscribe(thread, move |thread, _, cx| {
4272                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4273                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4274                        return tx.try_send(ix).unwrap();
4275                    }
4276                }
4277            })
4278        });
4279
4280        select! {
4281            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4282                panic!("Timeout waiting for tool call")
4283            }
4284            ix = rx.next().fuse() => {
4285                drop(subscription);
4286                ix.unwrap()
4287            }
4288        }
4289    }
4290
4291    #[derive(Clone, Default)]
4292    struct FakeAgentConnection {
4293        auth_methods: Vec<acp::AuthMethod>,
4294        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4295        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4296        on_user_message: Option<
4297            Rc<
4298                dyn Fn(
4299                        acp::PromptRequest,
4300                        WeakEntity<AcpThread>,
4301                        AsyncApp,
4302                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4303                    + 'static,
4304            >,
4305        >,
4306    }
4307
4308    impl FakeAgentConnection {
4309        fn new() -> Self {
4310            Self {
4311                auth_methods: Vec::new(),
4312                on_user_message: None,
4313                sessions: Arc::default(),
4314                set_title_calls: Default::default(),
4315            }
4316        }
4317
4318        #[expect(unused)]
4319        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4320            self.auth_methods = auth_methods;
4321            self
4322        }
4323
4324        fn on_user_message(
4325            mut self,
4326            handler: impl Fn(
4327                acp::PromptRequest,
4328                WeakEntity<AcpThread>,
4329                AsyncApp,
4330            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4331            + 'static,
4332        ) -> Self {
4333            self.on_user_message.replace(Rc::new(handler));
4334            self
4335        }
4336    }
4337
4338    impl AgentConnection for FakeAgentConnection {
4339        fn agent_id(&self) -> AgentId {
4340            AgentId::new("fake")
4341        }
4342
4343        fn telemetry_id(&self) -> SharedString {
4344            "fake".into()
4345        }
4346
4347        fn auth_methods(&self) -> &[acp::AuthMethod] {
4348            &self.auth_methods
4349        }
4350
4351        fn new_session(
4352            self: Rc<Self>,
4353            project: Entity<Project>,
4354            work_dirs: PathList,
4355            cx: &mut App,
4356        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4357            let session_id = acp::SessionId::new(
4358                rand::rng()
4359                    .sample_iter(&distr::Alphanumeric)
4360                    .take(7)
4361                    .map(char::from)
4362                    .collect::<String>(),
4363            );
4364            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4365            let thread = cx.new(|cx| {
4366                AcpThread::new(
4367                    None,
4368                    None,
4369                    Some(work_dirs),
4370                    self.clone(),
4371                    project,
4372                    action_log,
4373                    session_id.clone(),
4374                    watch::Receiver::constant(
4375                        acp::PromptCapabilities::new()
4376                            .image(true)
4377                            .audio(true)
4378                            .embedded_context(true),
4379                    ),
4380                    cx,
4381                )
4382            });
4383            self.sessions.lock().insert(session_id, thread.downgrade());
4384            Task::ready(Ok(thread))
4385        }
4386
4387        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4388            if self.auth_methods().iter().any(|m| m.id() == &method) {
4389                Task::ready(Ok(()))
4390            } else {
4391                Task::ready(Err(anyhow!("Invalid Auth Method")))
4392            }
4393        }
4394
4395        fn prompt(
4396            &self,
4397            _id: Option<UserMessageId>,
4398            params: acp::PromptRequest,
4399            cx: &mut App,
4400        ) -> Task<gpui::Result<acp::PromptResponse>> {
4401            let sessions = self.sessions.lock();
4402            let thread = sessions.get(&params.session_id).unwrap();
4403            if let Some(handler) = &self.on_user_message {
4404                let handler = handler.clone();
4405                let thread = thread.clone();
4406                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4407            } else {
4408                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4409            }
4410        }
4411
4412        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4413
4414        fn truncate(
4415            &self,
4416            session_id: &acp::SessionId,
4417            _cx: &App,
4418        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4419            Some(Rc::new(FakeAgentSessionEditor {
4420                _session_id: session_id.clone(),
4421            }))
4422        }
4423
4424        fn set_title(
4425            &self,
4426            _session_id: &acp::SessionId,
4427            _cx: &App,
4428        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4429            Some(Rc::new(FakeAgentSessionSetTitle {
4430                calls: self.set_title_calls.clone(),
4431            }))
4432        }
4433
4434        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4435            self
4436        }
4437    }
4438
4439    struct FakeAgentSessionSetTitle {
4440        calls: Rc<RefCell<Vec<SharedString>>>,
4441    }
4442
4443    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4444        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4445            self.calls.borrow_mut().push(title);
4446            Task::ready(Ok(()))
4447        }
4448    }
4449
4450    struct FakeAgentSessionEditor {
4451        _session_id: acp::SessionId,
4452    }
4453
4454    impl AgentSessionTruncate for FakeAgentSessionEditor {
4455        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4456            Task::ready(Ok(()))
4457        }
4458    }
4459
4460    #[gpui::test]
4461    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4462        init_test(cx);
4463
4464        let fs = FakeFs::new(cx.executor());
4465        let project = Project::test(fs, [], cx).await;
4466        let connection = Rc::new(FakeAgentConnection::new());
4467        let thread = cx
4468            .update(|cx| {
4469                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4470            })
4471            .await
4472            .unwrap();
4473
4474        // Try to update a tool call that doesn't exist
4475        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4476        thread.update(cx, |thread, cx| {
4477            let result = thread.handle_session_update(
4478                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4479                    nonexistent_id.clone(),
4480                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4481                )),
4482                cx,
4483            );
4484
4485            // The update should succeed (not return an error)
4486            assert!(result.is_ok());
4487
4488            // There should now be exactly one entry in the thread
4489            assert_eq!(thread.entries.len(), 1);
4490
4491            // The entry should be a failed tool call
4492            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4493                assert_eq!(tool_call.id, nonexistent_id);
4494                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4495                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4496
4497                // Check that the content contains the error message
4498                assert_eq!(tool_call.content.len(), 1);
4499                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4500                    match content_block {
4501                        ContentBlock::Markdown { markdown } => {
4502                            let markdown_text = markdown.read(cx).source();
4503                            assert!(markdown_text.contains("Tool call not found"));
4504                        }
4505                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4506                        ContentBlock::ResourceLink { .. } => {
4507                            panic!("Expected markdown content, got resource link")
4508                        }
4509                        ContentBlock::Image { .. } => {
4510                            panic!("Expected markdown content, got image")
4511                        }
4512                    }
4513                } else {
4514                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4515                }
4516            } else {
4517                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4518            }
4519        });
4520    }
4521
4522    /// Tests that restoring a checkpoint properly cleans up terminals that were
4523    /// created after that checkpoint, and cancels any in-progress generation.
4524    ///
4525    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4526    /// that were started after that checkpoint should be terminated, and any in-progress
4527    /// AI generation should be canceled.
4528    #[gpui::test]
4529    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4530        init_test(cx);
4531
4532        let fs = FakeFs::new(cx.executor());
4533        let project = Project::test(fs, [], cx).await;
4534        let connection = Rc::new(FakeAgentConnection::new());
4535        let thread = cx
4536            .update(|cx| {
4537                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4538            })
4539            .await
4540            .unwrap();
4541
4542        // Send first user message to create a checkpoint
4543        cx.update(|cx| {
4544            thread.update(cx, |thread, cx| {
4545                thread.send(vec!["first message".into()], cx)
4546            })
4547        })
4548        .await
4549        .unwrap();
4550
4551        // Send second message (creates another checkpoint) - we'll restore to this one
4552        cx.update(|cx| {
4553            thread.update(cx, |thread, cx| {
4554                thread.send(vec!["second message".into()], cx)
4555            })
4556        })
4557        .await
4558        .unwrap();
4559
4560        // Create 2 terminals BEFORE the checkpoint that have completed running
4561        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4562        let mock_terminal_1 = cx.new(|cx| {
4563            let builder = ::terminal::TerminalBuilder::new_display_only(
4564                ::terminal::terminal_settings::CursorShape::default(),
4565                ::terminal::terminal_settings::AlternateScroll::On,
4566                None,
4567                0,
4568                cx.background_executor(),
4569                PathStyle::local(),
4570            )
4571            .unwrap();
4572            builder.subscribe(cx)
4573        });
4574
4575        thread.update(cx, |thread, cx| {
4576            thread.on_terminal_provider_event(
4577                TerminalProviderEvent::Created {
4578                    terminal_id: terminal_id_1.clone(),
4579                    label: "echo 'first'".to_string(),
4580                    cwd: Some(PathBuf::from("/test")),
4581                    output_byte_limit: None,
4582                    terminal: mock_terminal_1.clone(),
4583                },
4584                cx,
4585            );
4586        });
4587
4588        thread.update(cx, |thread, cx| {
4589            thread.on_terminal_provider_event(
4590                TerminalProviderEvent::Output {
4591                    terminal_id: terminal_id_1.clone(),
4592                    data: b"first\n".to_vec(),
4593                },
4594                cx,
4595            );
4596        });
4597
4598        thread.update(cx, |thread, cx| {
4599            thread.on_terminal_provider_event(
4600                TerminalProviderEvent::Exit {
4601                    terminal_id: terminal_id_1.clone(),
4602                    status: acp::TerminalExitStatus::new().exit_code(0),
4603                },
4604                cx,
4605            );
4606        });
4607
4608        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4609        let mock_terminal_2 = cx.new(|cx| {
4610            let builder = ::terminal::TerminalBuilder::new_display_only(
4611                ::terminal::terminal_settings::CursorShape::default(),
4612                ::terminal::terminal_settings::AlternateScroll::On,
4613                None,
4614                0,
4615                cx.background_executor(),
4616                PathStyle::local(),
4617            )
4618            .unwrap();
4619            builder.subscribe(cx)
4620        });
4621
4622        thread.update(cx, |thread, cx| {
4623            thread.on_terminal_provider_event(
4624                TerminalProviderEvent::Created {
4625                    terminal_id: terminal_id_2.clone(),
4626                    label: "echo 'second'".to_string(),
4627                    cwd: Some(PathBuf::from("/test")),
4628                    output_byte_limit: None,
4629                    terminal: mock_terminal_2.clone(),
4630                },
4631                cx,
4632            );
4633        });
4634
4635        thread.update(cx, |thread, cx| {
4636            thread.on_terminal_provider_event(
4637                TerminalProviderEvent::Output {
4638                    terminal_id: terminal_id_2.clone(),
4639                    data: b"second\n".to_vec(),
4640                },
4641                cx,
4642            );
4643        });
4644
4645        thread.update(cx, |thread, cx| {
4646            thread.on_terminal_provider_event(
4647                TerminalProviderEvent::Exit {
4648                    terminal_id: terminal_id_2.clone(),
4649                    status: acp::TerminalExitStatus::new().exit_code(0),
4650                },
4651                cx,
4652            );
4653        });
4654
4655        // Get the second message ID to restore to
4656        let second_message_id = thread.read_with(cx, |thread, _| {
4657            // At this point we have:
4658            // - Index 0: First user message (with checkpoint)
4659            // - Index 1: Second user message (with checkpoint)
4660            // No assistant responses because FakeAgentConnection just returns EndTurn
4661            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4662                panic!("expected user message at index 1");
4663            };
4664            message.id.clone().unwrap()
4665        });
4666
4667        // Create a terminal AFTER the checkpoint we'll restore to.
4668        // This simulates the AI agent starting a long-running terminal command.
4669        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4670        let mock_terminal = cx.new(|cx| {
4671            let builder = ::terminal::TerminalBuilder::new_display_only(
4672                ::terminal::terminal_settings::CursorShape::default(),
4673                ::terminal::terminal_settings::AlternateScroll::On,
4674                None,
4675                0,
4676                cx.background_executor(),
4677                PathStyle::local(),
4678            )
4679            .unwrap();
4680            builder.subscribe(cx)
4681        });
4682
4683        // Register the terminal as created
4684        thread.update(cx, |thread, cx| {
4685            thread.on_terminal_provider_event(
4686                TerminalProviderEvent::Created {
4687                    terminal_id: terminal_id.clone(),
4688                    label: "sleep 1000".to_string(),
4689                    cwd: Some(PathBuf::from("/test")),
4690                    output_byte_limit: None,
4691                    terminal: mock_terminal.clone(),
4692                },
4693                cx,
4694            );
4695        });
4696
4697        // Simulate the terminal producing output (still running)
4698        thread.update(cx, |thread, cx| {
4699            thread.on_terminal_provider_event(
4700                TerminalProviderEvent::Output {
4701                    terminal_id: terminal_id.clone(),
4702                    data: b"terminal is running...\n".to_vec(),
4703                },
4704                cx,
4705            );
4706        });
4707
4708        // Create a tool call entry that references this terminal
4709        // This represents the agent requesting a terminal command
4710        thread.update(cx, |thread, cx| {
4711            thread
4712                .handle_session_update(
4713                    acp::SessionUpdate::ToolCall(
4714                        acp::ToolCall::new("terminal-tool-1", "Running command")
4715                            .kind(acp::ToolKind::Execute)
4716                            .status(acp::ToolCallStatus::InProgress)
4717                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4718                                terminal_id.clone(),
4719                            ))])
4720                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4721                    ),
4722                    cx,
4723                )
4724                .unwrap();
4725        });
4726
4727        // Verify terminal exists and is in the thread
4728        let terminal_exists_before =
4729            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4730        assert!(
4731            terminal_exists_before,
4732            "Terminal should exist before checkpoint restore"
4733        );
4734
4735        // Verify the terminal's underlying task is still running (not completed)
4736        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4737            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4738            terminal_entity.read_with(cx, |term, _cx| {
4739                term.output().is_none() // output is None means it's still running
4740            })
4741        });
4742        assert!(
4743            terminal_running_before,
4744            "Terminal should be running before checkpoint restore"
4745        );
4746
4747        // Verify we have the expected entries before restore
4748        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4749        assert!(
4750            entry_count_before > 1,
4751            "Should have multiple entries before restore"
4752        );
4753
4754        // Restore the checkpoint to the second message.
4755        // This should:
4756        // 1. Cancel any in-progress generation (via the cancel() call)
4757        // 2. Remove the terminal that was created after that point
4758        thread
4759            .update(cx, |thread, cx| {
4760                thread.restore_checkpoint(second_message_id, cx)
4761            })
4762            .await
4763            .unwrap();
4764
4765        // Verify that no send_task is in progress after restore
4766        // (cancel() clears the send_task)
4767        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4768        assert!(
4769            !has_send_task_after,
4770            "Should not have a send_task after restore (cancel should have cleared it)"
4771        );
4772
4773        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4774        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4775        assert_eq!(
4776            entry_count, 1,
4777            "Should have 1 entry after restore (only the first user message)"
4778        );
4779
4780        // Verify the 2 completed terminals from before the checkpoint still exist
4781        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4782            thread.terminals.contains_key(&terminal_id_1)
4783        });
4784        assert!(
4785            terminal_1_exists,
4786            "Terminal 1 (from before checkpoint) should still exist"
4787        );
4788
4789        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4790            thread.terminals.contains_key(&terminal_id_2)
4791        });
4792        assert!(
4793            terminal_2_exists,
4794            "Terminal 2 (from before checkpoint) should still exist"
4795        );
4796
4797        // Verify they're still in completed state
4798        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4799            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4800            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4801        });
4802        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4803
4804        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4805            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4806            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4807        });
4808        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4809
4810        // Verify the running terminal (created after checkpoint) was removed
4811        let terminal_3_exists =
4812            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4813        assert!(
4814            !terminal_3_exists,
4815            "Terminal 3 (created after checkpoint) should have been removed"
4816        );
4817
4818        // Verify total count is 2 (the two from before the checkpoint)
4819        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4820        assert_eq!(
4821            terminal_count, 2,
4822            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4823        );
4824    }
4825
4826    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4827    /// even when a new user message is added while the async checkpoint comparison is in progress.
4828    ///
4829    /// This is a regression test for a bug where update_last_checkpoint would fail with
4830    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4831    /// update_last_checkpoint started and when its async closure ran.
4832    #[gpui::test]
4833    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4834        init_test(cx);
4835
4836        let fs = FakeFs::new(cx.executor());
4837        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4838            .await;
4839        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4840
4841        let handler_done = Arc::new(AtomicBool::new(false));
4842        let handler_done_clone = handler_done.clone();
4843        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4844            move |_, _thread, _cx| {
4845                handler_done_clone.store(true, SeqCst);
4846                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4847            },
4848        ));
4849
4850        let thread = cx
4851            .update(|cx| {
4852                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4853            })
4854            .await
4855            .unwrap();
4856
4857        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4858        let send_task = cx.background_executor.spawn(send_future);
4859
4860        // Tick until handler completes, then a few more to let update_last_checkpoint start
4861        while !handler_done.load(SeqCst) {
4862            cx.executor().tick();
4863        }
4864        for _ in 0..5 {
4865            cx.executor().tick();
4866        }
4867
4868        thread.update(cx, |thread, cx| {
4869            thread.push_entry(
4870                AgentThreadEntry::UserMessage(UserMessage {
4871                    id: Some(UserMessageId::new()),
4872                    content: ContentBlock::Empty,
4873                    chunks: vec!["Injected message (no checkpoint)".into()],
4874                    checkpoint: None,
4875                    indented: false,
4876                }),
4877                cx,
4878            );
4879        });
4880
4881        cx.run_until_parked();
4882        let result = send_task.await;
4883
4884        assert!(
4885            result.is_ok(),
4886            "send should succeed even when new message added during update_last_checkpoint: {:?}",
4887            result.err()
4888        );
4889    }
4890
4891    /// Tests that when a follow-up message is sent during generation,
4892    /// the first turn completing does NOT clear `running_turn` because
4893    /// it now belongs to the second turn.
4894    #[gpui::test]
4895    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4896        init_test(cx);
4897
4898        let fs = FakeFs::new(cx.executor());
4899        let project = Project::test(fs, [], cx).await;
4900
4901        // First handler waits for this signal before completing
4902        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4903        let first_complete_rx = RefCell::new(Some(first_complete_rx));
4904
4905        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4906            move |params, _thread, _cx| {
4907                let first_complete_rx = first_complete_rx.borrow_mut().take();
4908                let is_first = params
4909                    .prompt
4910                    .iter()
4911                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4912
4913                async move {
4914                    if is_first {
4915                        // First handler waits until signaled
4916                        if let Some(rx) = first_complete_rx {
4917                            rx.await.ok();
4918                        }
4919                    }
4920                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4921                }
4922                .boxed_local()
4923            }
4924        }));
4925
4926        let thread = cx
4927            .update(|cx| {
4928                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4929            })
4930            .await
4931            .unwrap();
4932
4933        // Send first message (turn_id=1) - handler will block
4934        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4935        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4936
4937        // Send second message (turn_id=2) while first is still blocked
4938        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4939        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4940        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4941
4942        let running_turn_after_second_send =
4943            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4944        assert_eq!(
4945            running_turn_after_second_send,
4946            Some(2),
4947            "running_turn should be set to turn 2 after sending second message"
4948        );
4949
4950        // Now signal first handler to complete
4951        first_complete_tx.send(()).ok();
4952
4953        // First request completes - should NOT clear running_turn
4954        // because running_turn now belongs to turn 2
4955        first_request.await.unwrap();
4956
4957        let running_turn_after_first =
4958            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4959        assert_eq!(
4960            running_turn_after_first,
4961            Some(2),
4962            "first turn completing should not clear running_turn (belongs to turn 2)"
4963        );
4964
4965        // Second request completes - SHOULD clear running_turn
4966        second_request.await.unwrap();
4967
4968        let running_turn_after_second =
4969            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4970        assert!(
4971            !running_turn_after_second,
4972            "second turn completing should clear running_turn"
4973        );
4974    }
4975
4976    #[gpui::test]
4977    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4978        cx: &mut TestAppContext,
4979    ) {
4980        init_test(cx);
4981
4982        let fs = FakeFs::new(cx.executor());
4983        let project = Project::test(fs, [], cx).await;
4984
4985        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4986            move |_params, thread, mut cx| {
4987                async move {
4988                    thread
4989                        .update(&mut cx, |thread, cx| {
4990                            thread.handle_session_update(
4991                                acp::SessionUpdate::ToolCall(
4992                                    acp::ToolCall::new(
4993                                        acp::ToolCallId::new("test-tool"),
4994                                        "Test Tool",
4995                                    )
4996                                    .kind(acp::ToolKind::Fetch)
4997                                    .status(acp::ToolCallStatus::InProgress),
4998                                ),
4999                                cx,
5000                            )
5001                        })
5002                        .unwrap()
5003                        .unwrap();
5004
5005                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5006                }
5007                .boxed_local()
5008            },
5009        ));
5010
5011        let thread = cx
5012            .update(|cx| {
5013                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5014            })
5015            .await
5016            .unwrap();
5017
5018        let response = thread
5019            .update(cx, |thread, cx| thread.send_raw("test message", cx))
5020            .await;
5021
5022        let response = response
5023            .expect("send should succeed")
5024            .expect("should have response");
5025        assert_eq!(
5026            response.stop_reason,
5027            acp::StopReason::Cancelled,
5028            "response should have Cancelled stop_reason"
5029        );
5030
5031        thread.read_with(cx, |thread, _| {
5032            let tool_entry = thread
5033                .entries
5034                .iter()
5035                .find_map(|e| {
5036                    if let AgentThreadEntry::ToolCall(call) = e {
5037                        Some(call)
5038                    } else {
5039                        None
5040                    }
5041                })
5042                .expect("should have tool call entry");
5043
5044            assert!(
5045                matches!(tool_entry.status, ToolCallStatus::Canceled),
5046                "tool should be marked as Canceled when response is Cancelled, got {:?}",
5047                tool_entry.status
5048            );
5049        });
5050    }
5051
5052    #[gpui::test]
5053    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5054        init_test(cx);
5055
5056        let fs = FakeFs::new(cx.executor());
5057        let project = Project::test(fs, [], cx).await;
5058        let connection = Rc::new(FakeAgentConnection::new());
5059        let set_title_calls = connection.set_title_calls.clone();
5060
5061        let thread = cx
5062            .update(|cx| {
5063                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5064            })
5065            .await
5066            .unwrap();
5067
5068        // Initial title is the default.
5069        thread.read_with(cx, |thread, _| {
5070            assert_eq!(thread.title(), None);
5071        });
5072
5073        // Setting a provisional title updates the display title.
5074        thread.update(cx, |thread, cx| {
5075            thread.set_provisional_title("Hello, can you help…".into(), cx);
5076        });
5077        thread.read_with(cx, |thread, _| {
5078            assert_eq!(
5079                thread.title().as_ref().map(|s| s.as_str()),
5080                Some("Hello, can you help…")
5081            );
5082        });
5083
5084        // The provisional title should NOT have propagated to the connection.
5085        assert_eq!(
5086            set_title_calls.borrow().len(),
5087            0,
5088            "provisional title should not propagate to the connection"
5089        );
5090
5091        // When the real title arrives via set_title, it replaces the
5092        // provisional title and propagates to the connection.
5093        let task = thread.update(cx, |thread, cx| {
5094            thread.set_title("Helping with Rust question".into(), cx)
5095        });
5096        task.await.expect("set_title should succeed");
5097        thread.read_with(cx, |thread, _| {
5098            assert_eq!(
5099                thread.title().as_ref().map(|s| s.as_str()),
5100                Some("Helping with Rust question")
5101            );
5102        });
5103        assert_eq!(
5104            set_title_calls.borrow().as_slice(),
5105            &[SharedString::from("Helping with Rust question")],
5106            "real title should propagate to the connection"
5107        );
5108    }
5109
5110    #[gpui::test]
5111    async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5112        cx: &mut TestAppContext,
5113    ) {
5114        init_test(cx);
5115
5116        let fs = FakeFs::new(cx.executor());
5117        let project = Project::test(fs, [], cx).await;
5118        let connection = Rc::new(FakeAgentConnection::new());
5119
5120        let thread = cx
5121            .update(|cx| {
5122                connection.clone().new_session(
5123                    project,
5124                    PathList::new(&[Path::new(path!("/test"))]),
5125                    cx,
5126                )
5127            })
5128            .await
5129            .unwrap();
5130
5131        let title_updated_events = Rc::new(RefCell::new(0usize));
5132        let title_updated_events_for_subscription = title_updated_events.clone();
5133        thread.update(cx, |_thread, cx| {
5134            cx.subscribe(
5135                &thread,
5136                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5137                    if matches!(event, AcpThreadEvent::TitleUpdated) {
5138                        *title_updated_events_for_subscription.borrow_mut() += 1;
5139                    }
5140                },
5141            )
5142            .detach();
5143        });
5144
5145        thread.update(cx, |thread, cx| {
5146            thread.set_provisional_title("Hello, can you help…".into(), cx);
5147        });
5148        assert_eq!(
5149            *title_updated_events.borrow(),
5150            1,
5151            "setting a provisional title should emit TitleUpdated"
5152        );
5153
5154        let result = thread.update(cx, |thread, cx| {
5155            thread.handle_session_update(
5156                acp::SessionUpdate::SessionInfoUpdate(
5157                    acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5158                ),
5159                cx,
5160            )
5161        });
5162        result.expect("session info update should succeed");
5163
5164        thread.read_with(cx, |thread, _| {
5165            assert_eq!(
5166                thread.title().as_ref().map(|s| s.as_str()),
5167                Some("Helping with Rust question")
5168            );
5169            assert!(
5170                !thread.has_provisional_title(),
5171                "session info title update should clear provisional title"
5172            );
5173        });
5174
5175        assert_eq!(
5176            *title_updated_events.borrow(),
5177            2,
5178            "session info title update should emit TitleUpdated"
5179        );
5180        assert!(
5181            connection.set_title_calls.borrow().is_empty(),
5182            "session info title update should not propagate back to the connection"
5183        );
5184    }
5185}