acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  13use itertools::Itertools;
  14use language::language_settings::FormatOnSave;
  15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  16use markdown::Markdown;
  17pub use mention::*;
  18use project::lsp_store::{FormatTrigger, LspFormatTarget};
  19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  20use serde::{Deserialize, Serialize};
  21use serde_json::to_string_pretty;
  22use std::collections::HashMap;
  23use std::error::Error;
  24use std::fmt::{Formatter, Write};
  25use std::ops::Range;
  26use std::process::ExitStatus;
  27use std::rc::Rc;
  28use std::time::{Duration, Instant};
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use task::{Shell, ShellBuilder};
  31pub use terminal::*;
  32use text::Bias;
  33use ui::App;
  34use util::markdown::MarkdownEscaped;
  35use util::path_list::PathList;
  36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  37use uuid::Uuid;
  38
  39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  41pub const TOOL_NAME_META_KEY: &str = "tool_name";
  42
  43/// Helper to extract tool name from ACP meta
  44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  45    meta.as_ref()
  46        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  47        .and_then(|v| v.as_str())
  48        .map(|s| SharedString::from(s.to_owned()))
  49}
  50
  51/// Helper to create meta with tool name
  52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  53    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  54}
  55
  56/// Key used in ACP ToolCall meta to store the session id and message indexes
  57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  58
  59#[derive(Clone, Debug, Deserialize, Serialize)]
  60pub struct SubagentSessionInfo {
  61    /// The session id of the subagent sessiont that was spawned
  62    pub session_id: acp::SessionId,
  63    /// The index of the message of the start of the "turn" run by this tool call
  64    pub message_start_index: usize,
  65    /// The index of the output of the message that the subagent has returned
  66    #[serde(skip_serializing_if = "Option::is_none")]
  67    pub message_end_index: Option<usize>,
  68}
  69
  70/// Helper to extract subagent session id from ACP meta
  71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  72    meta.as_ref()
  73        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  74        .and_then(|v| serde_json::from_value(v.clone()).ok())
  75}
  76
  77#[derive(Debug)]
  78pub struct UserMessage {
  79    pub id: Option<UserMessageId>,
  80    pub content: ContentBlock,
  81    pub chunks: Vec<acp::ContentBlock>,
  82    pub checkpoint: Option<Checkpoint>,
  83    pub indented: bool,
  84}
  85
  86#[derive(Debug)]
  87pub struct Checkpoint {
  88    git_checkpoint: GitStoreCheckpoint,
  89    pub show: bool,
  90}
  91
  92impl UserMessage {
  93    fn to_markdown(&self, cx: &App) -> String {
  94        let mut markdown = String::new();
  95        if self
  96            .checkpoint
  97            .as_ref()
  98            .is_some_and(|checkpoint| checkpoint.show)
  99        {
 100            writeln!(markdown, "## User (checkpoint)").unwrap();
 101        } else {
 102            writeln!(markdown, "## User").unwrap();
 103        }
 104        writeln!(markdown).unwrap();
 105        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 106        writeln!(markdown).unwrap();
 107        markdown
 108    }
 109}
 110
 111#[derive(Debug, PartialEq)]
 112pub struct AssistantMessage {
 113    pub chunks: Vec<AssistantMessageChunk>,
 114    pub indented: bool,
 115    pub is_subagent_output: bool,
 116}
 117
 118impl AssistantMessage {
 119    pub fn to_markdown(&self, cx: &App) -> String {
 120        format!(
 121            "## Assistant\n\n{}\n\n",
 122            self.chunks
 123                .iter()
 124                .map(|chunk| chunk.to_markdown(cx))
 125                .join("\n\n")
 126        )
 127    }
 128}
 129
 130#[derive(Debug, PartialEq)]
 131pub enum AssistantMessageChunk {
 132    Message { block: ContentBlock },
 133    Thought { block: ContentBlock },
 134}
 135
 136impl AssistantMessageChunk {
 137    pub fn from_str(
 138        chunk: &str,
 139        language_registry: &Arc<LanguageRegistry>,
 140        path_style: PathStyle,
 141        cx: &mut App,
 142    ) -> Self {
 143        Self::Message {
 144            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 145        }
 146    }
 147
 148    fn to_markdown(&self, cx: &App) -> String {
 149        match self {
 150            Self::Message { block } => block.to_markdown(cx).to_string(),
 151            Self::Thought { block } => {
 152                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 153            }
 154        }
 155    }
 156}
 157
 158#[derive(Debug)]
 159pub enum AgentThreadEntry {
 160    UserMessage(UserMessage),
 161    AssistantMessage(AssistantMessage),
 162    ToolCall(ToolCall),
 163    CompletedPlan(Vec<PlanEntry>),
 164}
 165
 166impl AgentThreadEntry {
 167    pub fn is_indented(&self) -> bool {
 168        match self {
 169            Self::UserMessage(message) => message.indented,
 170            Self::AssistantMessage(message) => message.indented,
 171            Self::ToolCall(_) => false,
 172            Self::CompletedPlan(_) => false,
 173        }
 174    }
 175
 176    pub fn to_markdown(&self, cx: &App) -> String {
 177        match self {
 178            Self::UserMessage(message) => message.to_markdown(cx),
 179            Self::AssistantMessage(message) => message.to_markdown(cx),
 180            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 181            Self::CompletedPlan(entries) => {
 182                let mut md = String::from("## Plan\n\n");
 183                for entry in entries {
 184                    let source = entry.content.read(cx).source().to_string();
 185                    md.push_str(&format!("- [x] {}\n", source));
 186                }
 187                md
 188            }
 189        }
 190    }
 191
 192    pub fn user_message(&self) -> Option<&UserMessage> {
 193        if let AgentThreadEntry::UserMessage(message) = self {
 194            Some(message)
 195        } else {
 196            None
 197        }
 198    }
 199
 200    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 201        if let AgentThreadEntry::ToolCall(call) = self {
 202            itertools::Either::Left(call.diffs())
 203        } else {
 204            itertools::Either::Right(std::iter::empty())
 205        }
 206    }
 207
 208    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 209        if let AgentThreadEntry::ToolCall(call) = self {
 210            itertools::Either::Left(call.terminals())
 211        } else {
 212            itertools::Either::Right(std::iter::empty())
 213        }
 214    }
 215
 216    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 217        if let AgentThreadEntry::ToolCall(ToolCall {
 218            locations,
 219            resolved_locations,
 220            ..
 221        }) = self
 222        {
 223            Some((
 224                locations.get(ix)?.clone(),
 225                resolved_locations.get(ix)?.clone()?,
 226            ))
 227        } else {
 228            None
 229        }
 230    }
 231}
 232
 233#[derive(Debug)]
 234pub struct ToolCall {
 235    pub id: acp::ToolCallId,
 236    pub label: Entity<Markdown>,
 237    pub kind: acp::ToolKind,
 238    pub content: Vec<ToolCallContent>,
 239    pub status: ToolCallStatus,
 240    pub locations: Vec<acp::ToolCallLocation>,
 241    pub resolved_locations: Vec<Option<AgentLocation>>,
 242    pub raw_input: Option<serde_json::Value>,
 243    pub raw_input_markdown: Option<Entity<Markdown>>,
 244    pub raw_output: Option<serde_json::Value>,
 245    pub tool_name: Option<SharedString>,
 246    pub subagent_session_info: Option<SubagentSessionInfo>,
 247}
 248
 249impl ToolCall {
 250    fn from_acp(
 251        tool_call: acp::ToolCall,
 252        status: ToolCallStatus,
 253        language_registry: Arc<LanguageRegistry>,
 254        path_style: PathStyle,
 255        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 256        cx: &mut App,
 257    ) -> Result<Self> {
 258        let title = if tool_call.kind == acp::ToolKind::Execute {
 259            tool_call.title
 260        } else if tool_call.kind == acp::ToolKind::Edit {
 261            MarkdownEscaped(tool_call.title.as_str()).to_string()
 262        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 263            first_line.to_owned() + ""
 264        } else {
 265            tool_call.title
 266        };
 267        let mut content = Vec::with_capacity(tool_call.content.len());
 268        for item in tool_call.content {
 269            if let Some(item) = ToolCallContent::from_acp(
 270                item,
 271                language_registry.clone(),
 272                path_style,
 273                terminals,
 274                cx,
 275            )? {
 276                content.push(item);
 277            }
 278        }
 279
 280        let raw_input_markdown = tool_call
 281            .raw_input
 282            .as_ref()
 283            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 284
 285        let tool_name = tool_name_from_meta(&tool_call.meta);
 286
 287        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 288
 289        let result = Self {
 290            id: tool_call.tool_call_id,
 291            label: cx
 292                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 293            kind: tool_call.kind,
 294            content,
 295            locations: tool_call.locations,
 296            resolved_locations: Vec::default(),
 297            status,
 298            raw_input: tool_call.raw_input,
 299            raw_input_markdown,
 300            raw_output: tool_call.raw_output,
 301            tool_name,
 302            subagent_session_info,
 303        };
 304        Ok(result)
 305    }
 306
 307    fn update_fields(
 308        &mut self,
 309        fields: acp::ToolCallUpdateFields,
 310        meta: Option<acp::Meta>,
 311        language_registry: Arc<LanguageRegistry>,
 312        path_style: PathStyle,
 313        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 314        cx: &mut App,
 315    ) -> Result<()> {
 316        let acp::ToolCallUpdateFields {
 317            kind,
 318            status,
 319            title,
 320            content,
 321            locations,
 322            raw_input,
 323            raw_output,
 324            ..
 325        } = fields;
 326
 327        if let Some(kind) = kind {
 328            self.kind = kind;
 329        }
 330
 331        if let Some(status) = status {
 332            self.status = status.into();
 333        }
 334
 335        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 336            self.subagent_session_info = Some(subagent_session_info);
 337        }
 338
 339        if let Some(title) = title {
 340            if self.kind == acp::ToolKind::Execute {
 341                for terminal in self.terminals() {
 342                    terminal.update(cx, |terminal, cx| {
 343                        terminal.update_command_label(&title, cx);
 344                    });
 345                }
 346            }
 347            self.label.update(cx, |label, cx| {
 348                if self.kind == acp::ToolKind::Execute {
 349                    label.replace(title, cx);
 350                } else if self.kind == acp::ToolKind::Edit {
 351                    label.replace(MarkdownEscaped(&title).to_string(), cx)
 352                } else if let Some((first_line, _)) = title.split_once("\n") {
 353                    label.replace(first_line.to_owned() + "", cx);
 354                } else {
 355                    label.replace(title, cx);
 356                }
 357            });
 358        }
 359
 360        if let Some(content) = content {
 361            let mut new_content_len = content.len();
 362            let mut content = content.into_iter();
 363
 364            // Reuse existing content if we can
 365            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 366                let valid_content =
 367                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 368                if !valid_content {
 369                    new_content_len -= 1;
 370                }
 371            }
 372            for new in content {
 373                if let Some(new) = ToolCallContent::from_acp(
 374                    new,
 375                    language_registry.clone(),
 376                    path_style,
 377                    terminals,
 378                    cx,
 379                )? {
 380                    self.content.push(new);
 381                } else {
 382                    new_content_len -= 1;
 383                }
 384            }
 385            self.content.truncate(new_content_len);
 386        }
 387
 388        if let Some(locations) = locations {
 389            self.locations = locations;
 390        }
 391
 392        if let Some(raw_input) = raw_input {
 393            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 394            self.raw_input = Some(raw_input);
 395        }
 396
 397        if let Some(raw_output) = raw_output {
 398            if self.content.is_empty()
 399                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 400            {
 401                self.content
 402                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 403                        markdown,
 404                    }));
 405            }
 406            self.raw_output = Some(raw_output);
 407        }
 408        Ok(())
 409    }
 410
 411    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 412        self.content.iter().filter_map(|content| match content {
 413            ToolCallContent::Diff(diff) => Some(diff),
 414            ToolCallContent::ContentBlock(_) => None,
 415            ToolCallContent::Terminal(_) => None,
 416        })
 417    }
 418
 419    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 420        self.content.iter().filter_map(|content| match content {
 421            ToolCallContent::Terminal(terminal) => Some(terminal),
 422            ToolCallContent::ContentBlock(_) => None,
 423            ToolCallContent::Diff(_) => None,
 424        })
 425    }
 426
 427    pub fn is_subagent(&self) -> bool {
 428        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 429            || self.subagent_session_info.is_some()
 430    }
 431
 432    pub fn to_markdown(&self, cx: &App) -> String {
 433        let mut markdown = format!(
 434            "**Tool Call: {}**\nStatus: {}\n\n",
 435            self.label.read(cx).source(),
 436            self.status
 437        );
 438        for content in &self.content {
 439            markdown.push_str(content.to_markdown(cx).as_str());
 440            markdown.push_str("\n\n");
 441        }
 442        markdown
 443    }
 444
 445    async fn resolve_location(
 446        location: acp::ToolCallLocation,
 447        project: WeakEntity<Project>,
 448        cx: &mut AsyncApp,
 449    ) -> Option<ResolvedLocation> {
 450        let buffer = project
 451            .update(cx, |project, cx| {
 452                project
 453                    .project_path_for_absolute_path(&location.path, cx)
 454                    .map(|path| project.open_buffer(path, cx))
 455            })
 456            .ok()??;
 457        let buffer = buffer.await.log_err()?;
 458        let position = buffer.update(cx, |buffer, _| {
 459            let snapshot = buffer.snapshot();
 460            if let Some(row) = location.line {
 461                let column = snapshot.indent_size_for_line(row).len;
 462                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 463                snapshot.anchor_before(point)
 464            } else {
 465                Anchor::min_for_buffer(snapshot.remote_id())
 466            }
 467        });
 468
 469        Some(ResolvedLocation { buffer, position })
 470    }
 471
 472    fn resolve_locations(
 473        &self,
 474        project: Entity<Project>,
 475        cx: &mut App,
 476    ) -> Task<Vec<Option<ResolvedLocation>>> {
 477        let locations = self.locations.clone();
 478        project.update(cx, |_, cx| {
 479            cx.spawn(async move |project, cx| {
 480                let mut new_locations = Vec::new();
 481                for location in locations {
 482                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 483                }
 484                new_locations
 485            })
 486        })
 487    }
 488}
 489
 490// Separate so we can hold a strong reference to the buffer
 491// for saving on the thread
 492#[derive(Clone, Debug, PartialEq, Eq)]
 493struct ResolvedLocation {
 494    buffer: Entity<Buffer>,
 495    position: Anchor,
 496}
 497
 498impl From<&ResolvedLocation> for AgentLocation {
 499    fn from(value: &ResolvedLocation) -> Self {
 500        Self {
 501            buffer: value.buffer.downgrade(),
 502            position: value.position,
 503        }
 504    }
 505}
 506
 507#[derive(Debug, Clone)]
 508pub enum SelectedPermissionParams {
 509    Terminal { patterns: Vec<String> },
 510}
 511
 512#[derive(Debug)]
 513pub struct SelectedPermissionOutcome {
 514    pub option_id: acp::PermissionOptionId,
 515    pub option_kind: acp::PermissionOptionKind,
 516    pub params: Option<SelectedPermissionParams>,
 517}
 518
 519impl SelectedPermissionOutcome {
 520    pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
 521        Self {
 522            option_id,
 523            option_kind,
 524            params: None,
 525        }
 526    }
 527
 528    pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
 529        self.params = params;
 530        self
 531    }
 532}
 533
 534impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
 535    fn from(value: SelectedPermissionOutcome) -> Self {
 536        Self::new(value.option_id)
 537    }
 538}
 539
 540#[derive(Debug)]
 541pub enum RequestPermissionOutcome {
 542    Cancelled,
 543    Selected(SelectedPermissionOutcome),
 544}
 545
 546impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
 547    fn from(value: RequestPermissionOutcome) -> Self {
 548        match value {
 549            RequestPermissionOutcome::Cancelled => Self::Cancelled,
 550            RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
 551        }
 552    }
 553}
 554
 555#[derive(Debug)]
 556pub enum ToolCallStatus {
 557    /// The tool call hasn't started running yet, but we start showing it to
 558    /// the user.
 559    Pending,
 560    /// The tool call is waiting for confirmation from the user.
 561    WaitingForConfirmation {
 562        options: PermissionOptions,
 563        respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
 564    },
 565    /// The tool call is currently running.
 566    InProgress,
 567    /// The tool call completed successfully.
 568    Completed,
 569    /// The tool call failed.
 570    Failed,
 571    /// The user rejected the tool call.
 572    Rejected,
 573    /// The user canceled generation so the tool call was canceled.
 574    Canceled,
 575}
 576
 577impl From<acp::ToolCallStatus> for ToolCallStatus {
 578    fn from(status: acp::ToolCallStatus) -> Self {
 579        match status {
 580            acp::ToolCallStatus::Pending => Self::Pending,
 581            acp::ToolCallStatus::InProgress => Self::InProgress,
 582            acp::ToolCallStatus::Completed => Self::Completed,
 583            acp::ToolCallStatus::Failed => Self::Failed,
 584            _ => Self::Pending,
 585        }
 586    }
 587}
 588
 589impl Display for ToolCallStatus {
 590    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 591        write!(
 592            f,
 593            "{}",
 594            match self {
 595                ToolCallStatus::Pending => "Pending",
 596                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 597                ToolCallStatus::InProgress => "In Progress",
 598                ToolCallStatus::Completed => "Completed",
 599                ToolCallStatus::Failed => "Failed",
 600                ToolCallStatus::Rejected => "Rejected",
 601                ToolCallStatus::Canceled => "Canceled",
 602            }
 603        )
 604    }
 605}
 606
 607#[derive(Debug, PartialEq, Clone)]
 608pub enum ContentBlock {
 609    Empty,
 610    Markdown { markdown: Entity<Markdown> },
 611    ResourceLink { resource_link: acp::ResourceLink },
 612    Image { image: Arc<gpui::Image> },
 613}
 614
 615impl ContentBlock {
 616    pub fn new(
 617        block: acp::ContentBlock,
 618        language_registry: &Arc<LanguageRegistry>,
 619        path_style: PathStyle,
 620        cx: &mut App,
 621    ) -> Self {
 622        let mut this = Self::Empty;
 623        this.append(block, language_registry, path_style, cx);
 624        this
 625    }
 626
 627    pub fn new_combined(
 628        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 629        language_registry: Arc<LanguageRegistry>,
 630        path_style: PathStyle,
 631        cx: &mut App,
 632    ) -> Self {
 633        let mut this = Self::Empty;
 634        for block in blocks {
 635            this.append(block, &language_registry, path_style, cx);
 636        }
 637        this
 638    }
 639
 640    pub fn append(
 641        &mut self,
 642        block: acp::ContentBlock,
 643        language_registry: &Arc<LanguageRegistry>,
 644        path_style: PathStyle,
 645        cx: &mut App,
 646    ) {
 647        match (&mut *self, &block) {
 648            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 649                *self = ContentBlock::ResourceLink {
 650                    resource_link: resource_link.clone(),
 651                };
 652            }
 653            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 654                if let Some(image) = Self::decode_image(image_content) {
 655                    *self = ContentBlock::Image { image };
 656                } else {
 657                    let new_content = Self::image_md(image_content);
 658                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 659                }
 660            }
 661            (ContentBlock::Empty, _) => {
 662                let new_content = Self::block_string_contents(&block, path_style);
 663                *self = Self::create_markdown_block(new_content, language_registry, cx);
 664            }
 665            (ContentBlock::Markdown { markdown }, _) => {
 666                let new_content = Self::block_string_contents(&block, path_style);
 667                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 668            }
 669            (ContentBlock::ResourceLink { resource_link }, _) => {
 670                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 671                let new_content = Self::block_string_contents(&block, path_style);
 672                let combined = format!("{}\n{}", existing_content, new_content);
 673                *self = Self::create_markdown_block(combined, language_registry, cx);
 674            }
 675            (ContentBlock::Image { .. }, _) => {
 676                let new_content = Self::block_string_contents(&block, path_style);
 677                let combined = format!("`Image`\n{}", new_content);
 678                *self = Self::create_markdown_block(combined, language_registry, cx);
 679            }
 680        }
 681    }
 682
 683    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 684        use base64::Engine as _;
 685
 686        let bytes = base64::engine::general_purpose::STANDARD
 687            .decode(image_content.data.as_bytes())
 688            .ok()?;
 689        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 690        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 691    }
 692
 693    fn create_markdown_block(
 694        content: String,
 695        language_registry: &Arc<LanguageRegistry>,
 696        cx: &mut App,
 697    ) -> ContentBlock {
 698        ContentBlock::Markdown {
 699            markdown: cx
 700                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 701        }
 702    }
 703
 704    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 705        match block {
 706            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 707            acp::ContentBlock::ResourceLink(resource_link) => {
 708                Self::resource_link_md(&resource_link.uri, path_style)
 709            }
 710            acp::ContentBlock::Resource(acp::EmbeddedResource {
 711                resource:
 712                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 713                        uri,
 714                        ..
 715                    }),
 716                ..
 717            }) => Self::resource_link_md(uri, path_style),
 718            acp::ContentBlock::Image(image) => Self::image_md(image),
 719            _ => String::new(),
 720        }
 721    }
 722
 723    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 724        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 725            uri.as_link().to_string()
 726        } else {
 727            uri.to_string()
 728        }
 729    }
 730
 731    fn image_md(_image: &acp::ImageContent) -> String {
 732        "`Image`".into()
 733    }
 734
 735    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 736        match self {
 737            ContentBlock::Empty => "",
 738            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 739            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 740            ContentBlock::Image { .. } => "`Image`",
 741        }
 742    }
 743
 744    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 745        match self {
 746            ContentBlock::Empty => None,
 747            ContentBlock::Markdown { markdown } => Some(markdown),
 748            ContentBlock::ResourceLink { .. } => None,
 749            ContentBlock::Image { .. } => None,
 750        }
 751    }
 752
 753    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 754        match self {
 755            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 756            _ => None,
 757        }
 758    }
 759
 760    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 761        match self {
 762            ContentBlock::Image { image } => Some(image),
 763            _ => None,
 764        }
 765    }
 766}
 767
 768#[derive(Debug)]
 769pub enum ToolCallContent {
 770    ContentBlock(ContentBlock),
 771    Diff(Entity<Diff>),
 772    Terminal(Entity<Terminal>),
 773}
 774
 775impl ToolCallContent {
 776    pub fn from_acp(
 777        content: acp::ToolCallContent,
 778        language_registry: Arc<LanguageRegistry>,
 779        path_style: PathStyle,
 780        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 781        cx: &mut App,
 782    ) -> Result<Option<Self>> {
 783        match content {
 784            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 785                Ok(Some(Self::ContentBlock(ContentBlock::new(
 786                    content,
 787                    &language_registry,
 788                    path_style,
 789                    cx,
 790                ))))
 791            }
 792            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 793                Diff::finalized(
 794                    diff.path.to_string_lossy().into_owned(),
 795                    diff.old_text,
 796                    diff.new_text,
 797                    language_registry,
 798                    cx,
 799                )
 800            })))),
 801            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 802                .get(&terminal_id)
 803                .cloned()
 804                .map(|terminal| Some(Self::Terminal(terminal)))
 805                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 806            _ => Ok(None),
 807        }
 808    }
 809
 810    pub fn update_from_acp(
 811        &mut self,
 812        new: acp::ToolCallContent,
 813        language_registry: Arc<LanguageRegistry>,
 814        path_style: PathStyle,
 815        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 816        cx: &mut App,
 817    ) -> Result<bool> {
 818        let needs_update = match (&self, &new) {
 819            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 820                old_diff.read(cx).needs_update(
 821                    new_diff.old_text.as_deref().unwrap_or(""),
 822                    &new_diff.new_text,
 823                    cx,
 824                )
 825            }
 826            _ => true,
 827        };
 828
 829        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 830            if needs_update {
 831                *self = update;
 832            }
 833            Ok(true)
 834        } else {
 835            Ok(false)
 836        }
 837    }
 838
 839    pub fn to_markdown(&self, cx: &App) -> String {
 840        match self {
 841            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 842            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 843            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 844        }
 845    }
 846
 847    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 848        match self {
 849            Self::ContentBlock(content) => content.image(),
 850            _ => None,
 851        }
 852    }
 853}
 854
 855#[derive(Debug, PartialEq)]
 856pub enum ToolCallUpdate {
 857    UpdateFields(acp::ToolCallUpdate),
 858    UpdateDiff(ToolCallUpdateDiff),
 859    UpdateTerminal(ToolCallUpdateTerminal),
 860}
 861
 862impl ToolCallUpdate {
 863    fn id(&self) -> &acp::ToolCallId {
 864        match self {
 865            Self::UpdateFields(update) => &update.tool_call_id,
 866            Self::UpdateDiff(diff) => &diff.id,
 867            Self::UpdateTerminal(terminal) => &terminal.id,
 868        }
 869    }
 870}
 871
 872impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 873    fn from(update: acp::ToolCallUpdate) -> Self {
 874        Self::UpdateFields(update)
 875    }
 876}
 877
 878impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 879    fn from(diff: ToolCallUpdateDiff) -> Self {
 880        Self::UpdateDiff(diff)
 881    }
 882}
 883
 884#[derive(Debug, PartialEq)]
 885pub struct ToolCallUpdateDiff {
 886    pub id: acp::ToolCallId,
 887    pub diff: Entity<Diff>,
 888}
 889
 890impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 891    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 892        Self::UpdateTerminal(terminal)
 893    }
 894}
 895
 896#[derive(Debug, PartialEq)]
 897pub struct ToolCallUpdateTerminal {
 898    pub id: acp::ToolCallId,
 899    pub terminal: Entity<Terminal>,
 900}
 901
 902#[derive(Debug, Default)]
 903pub struct Plan {
 904    pub entries: Vec<PlanEntry>,
 905}
 906
 907#[derive(Debug)]
 908pub struct PlanStats<'a> {
 909    pub in_progress_entry: Option<&'a PlanEntry>,
 910    pub pending: u32,
 911    pub completed: u32,
 912}
 913
 914impl Plan {
 915    pub fn is_empty(&self) -> bool {
 916        self.entries.is_empty()
 917    }
 918
 919    pub fn stats(&self) -> PlanStats<'_> {
 920        let mut stats = PlanStats {
 921            in_progress_entry: None,
 922            pending: 0,
 923            completed: 0,
 924        };
 925
 926        for entry in &self.entries {
 927            match &entry.status {
 928                acp::PlanEntryStatus::Pending => {
 929                    stats.pending += 1;
 930                }
 931                acp::PlanEntryStatus::InProgress => {
 932                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 933                    stats.pending += 1;
 934                }
 935                acp::PlanEntryStatus::Completed => {
 936                    stats.completed += 1;
 937                }
 938                _ => {}
 939            }
 940        }
 941
 942        stats
 943    }
 944}
 945
 946#[derive(Debug)]
 947pub struct PlanEntry {
 948    pub content: Entity<Markdown>,
 949    pub priority: acp::PlanEntryPriority,
 950    pub status: acp::PlanEntryStatus,
 951}
 952
 953impl PlanEntry {
 954    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 955        Self {
 956            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 957            priority: entry.priority,
 958            status: entry.status,
 959        }
 960    }
 961}
 962
 963#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 964pub struct TokenUsage {
 965    pub max_tokens: u64,
 966    pub used_tokens: u64,
 967    pub input_tokens: u64,
 968    pub output_tokens: u64,
 969    pub max_output_tokens: Option<u64>,
 970}
 971
 972pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 973
 974impl TokenUsage {
 975    pub fn ratio(&self) -> TokenUsageRatio {
 976        #[cfg(debug_assertions)]
 977        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 978            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 979            .parse()
 980            .unwrap();
 981        #[cfg(not(debug_assertions))]
 982        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
 983
 984        // When the maximum is unknown because there is no selected model,
 985        // avoid showing the token limit warning.
 986        if self.max_tokens == 0 {
 987            TokenUsageRatio::Normal
 988        } else if self.used_tokens >= self.max_tokens {
 989            TokenUsageRatio::Exceeded
 990        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 991            TokenUsageRatio::Warning
 992        } else {
 993            TokenUsageRatio::Normal
 994        }
 995    }
 996}
 997
 998#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
 999pub enum TokenUsageRatio {
1000    Normal,
1001    Warning,
1002    Exceeded,
1003}
1004
1005#[derive(Debug, Clone)]
1006pub struct RetryStatus {
1007    pub last_error: SharedString,
1008    pub attempt: usize,
1009    pub max_attempts: usize,
1010    pub started_at: Instant,
1011    pub duration: Duration,
1012}
1013
1014struct RunningTurn {
1015    id: u32,
1016    send_task: Task<()>,
1017}
1018
1019pub struct AcpThread {
1020    session_id: acp::SessionId,
1021    work_dirs: Option<PathList>,
1022    parent_session_id: Option<acp::SessionId>,
1023    title: Option<SharedString>,
1024    provisional_title: Option<SharedString>,
1025    entries: Vec<AgentThreadEntry>,
1026    plan: Plan,
1027    project: Entity<Project>,
1028    action_log: Entity<ActionLog>,
1029    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1030    turn_id: u32,
1031    running_turn: Option<RunningTurn>,
1032    connection: Rc<dyn AgentConnection>,
1033    token_usage: Option<TokenUsage>,
1034    prompt_capabilities: acp::PromptCapabilities,
1035    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1036    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1037    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1038    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1039    had_error: bool,
1040    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1041    draft_prompt: Option<Vec<acp::ContentBlock>>,
1042    /// The initial scroll position for the thread view, set during session registration.
1043    ui_scroll_position: Option<gpui::ListOffset>,
1044    /// Buffer for smooth text streaming. Holds text that has been received from
1045    /// the model but not yet revealed in the UI. A timer task drains this buffer
1046    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1047    /// updates.
1048    streaming_text_buffer: Option<StreamingTextBuffer>,
1049}
1050
1051struct StreamingTextBuffer {
1052    /// Text received from the model but not yet appended to the Markdown source.
1053    pending: String,
1054    /// The number of bytes to reveal per timer turn.
1055    bytes_to_reveal_per_tick: usize,
1056    /// The Markdown entity being streamed into.
1057    target: Entity<Markdown>,
1058    /// Timer task that periodically moves text from `pending` into `source`.
1059    _reveal_task: Task<()>,
1060}
1061
1062impl StreamingTextBuffer {
1063    /// The number of milliseconds between each timer tick, controlling how quickly
1064    /// text is revealed.
1065    const TASK_UPDATE_MS: u64 = 16;
1066    /// The time in milliseconds to reveal the entire pending text.
1067    const REVEAL_TARGET: f32 = 200.0;
1068}
1069
1070impl From<&AcpThread> for ActionLogTelemetry {
1071    fn from(value: &AcpThread) -> Self {
1072        Self {
1073            agent_telemetry_id: value.connection().telemetry_id(),
1074            session_id: value.session_id.0.clone(),
1075        }
1076    }
1077}
1078
1079#[derive(Debug)]
1080pub enum AcpThreadEvent {
1081    NewEntry,
1082    TitleUpdated,
1083    TokenUsageUpdated,
1084    EntryUpdated(usize),
1085    EntriesRemoved(Range<usize>),
1086    ToolAuthorizationRequested(acp::ToolCallId),
1087    ToolAuthorizationReceived(acp::ToolCallId),
1088    Retry(RetryStatus),
1089    SubagentSpawned(acp::SessionId),
1090    Stopped(acp::StopReason),
1091    Error,
1092    LoadError(LoadError),
1093    PromptCapabilitiesUpdated,
1094    Refusal,
1095    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1096    ModeUpdated(acp::SessionModeId),
1097    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1098}
1099
1100impl EventEmitter<AcpThreadEvent> for AcpThread {}
1101
1102#[derive(Debug, Clone)]
1103pub enum TerminalProviderEvent {
1104    Created {
1105        terminal_id: acp::TerminalId,
1106        label: String,
1107        cwd: Option<PathBuf>,
1108        output_byte_limit: Option<u64>,
1109        terminal: Entity<::terminal::Terminal>,
1110    },
1111    Output {
1112        terminal_id: acp::TerminalId,
1113        data: Vec<u8>,
1114    },
1115    TitleChanged {
1116        terminal_id: acp::TerminalId,
1117        title: String,
1118    },
1119    Exit {
1120        terminal_id: acp::TerminalId,
1121        status: acp::TerminalExitStatus,
1122    },
1123}
1124
1125#[derive(Debug, Clone)]
1126pub enum TerminalProviderCommand {
1127    WriteInput {
1128        terminal_id: acp::TerminalId,
1129        bytes: Vec<u8>,
1130    },
1131    Resize {
1132        terminal_id: acp::TerminalId,
1133        cols: u16,
1134        rows: u16,
1135    },
1136    Close {
1137        terminal_id: acp::TerminalId,
1138    },
1139}
1140
1141#[derive(PartialEq, Eq, Debug)]
1142pub enum ThreadStatus {
1143    Idle,
1144    Generating,
1145}
1146
1147#[derive(Debug, Clone)]
1148pub enum LoadError {
1149    Unsupported {
1150        command: SharedString,
1151        current_version: SharedString,
1152        minimum_version: SharedString,
1153    },
1154    FailedToInstall(SharedString),
1155    Exited {
1156        status: ExitStatus,
1157    },
1158    Other(SharedString),
1159}
1160
1161impl Display for LoadError {
1162    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1163        match self {
1164            LoadError::Unsupported {
1165                command: path,
1166                current_version,
1167                minimum_version,
1168            } => {
1169                write!(
1170                    f,
1171                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1172                )
1173            }
1174            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1175            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1176            LoadError::Other(msg) => write!(f, "{msg}"),
1177        }
1178    }
1179}
1180
1181impl Error for LoadError {}
1182
1183impl AcpThread {
1184    pub fn new(
1185        parent_session_id: Option<acp::SessionId>,
1186        title: Option<SharedString>,
1187        work_dirs: Option<PathList>,
1188        connection: Rc<dyn AgentConnection>,
1189        project: Entity<Project>,
1190        action_log: Entity<ActionLog>,
1191        session_id: acp::SessionId,
1192        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1193        cx: &mut Context<Self>,
1194    ) -> Self {
1195        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1196        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1197            loop {
1198                let caps = prompt_capabilities_rx.recv().await?;
1199                this.update(cx, |this, cx| {
1200                    this.prompt_capabilities = caps;
1201                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1202                })?;
1203            }
1204        });
1205
1206        Self {
1207            parent_session_id,
1208            work_dirs,
1209            action_log,
1210            shared_buffers: Default::default(),
1211            entries: Default::default(),
1212            plan: Default::default(),
1213            title,
1214            provisional_title: None,
1215            project,
1216            running_turn: None,
1217            turn_id: 0,
1218            connection,
1219            session_id,
1220            token_usage: None,
1221            prompt_capabilities,
1222            _observe_prompt_capabilities: task,
1223            terminals: HashMap::default(),
1224            pending_terminal_output: HashMap::default(),
1225            pending_terminal_exit: HashMap::default(),
1226            had_error: false,
1227            draft_prompt: None,
1228            ui_scroll_position: None,
1229            streaming_text_buffer: None,
1230        }
1231    }
1232
1233    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1234        self.parent_session_id.as_ref()
1235    }
1236
1237    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1238        self.prompt_capabilities.clone()
1239    }
1240
1241    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1242        self.draft_prompt.as_deref()
1243    }
1244
1245    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1246        self.draft_prompt = prompt;
1247    }
1248
1249    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1250        self.ui_scroll_position
1251    }
1252
1253    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1254        self.ui_scroll_position = position;
1255    }
1256
1257    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1258        &self.connection
1259    }
1260
1261    pub fn action_log(&self) -> &Entity<ActionLog> {
1262        &self.action_log
1263    }
1264
1265    pub fn project(&self) -> &Entity<Project> {
1266        &self.project
1267    }
1268
1269    pub fn title(&self) -> Option<SharedString> {
1270        self.title
1271            .clone()
1272            .or_else(|| self.provisional_title.clone())
1273    }
1274
1275    pub fn has_provisional_title(&self) -> bool {
1276        self.provisional_title.is_some()
1277    }
1278
1279    pub fn is_draft(&self) -> bool {
1280        self.entries.is_empty()
1281    }
1282
1283    pub fn entries(&self) -> &[AgentThreadEntry] {
1284        &self.entries
1285    }
1286
1287    pub fn session_id(&self) -> &acp::SessionId {
1288        &self.session_id
1289    }
1290
1291    pub fn work_dirs(&self) -> Option<&PathList> {
1292        self.work_dirs.as_ref()
1293    }
1294
1295    pub fn set_work_dirs(&mut self, work_dirs: PathList) {
1296        self.work_dirs = Some(work_dirs);
1297    }
1298
1299    pub fn status(&self) -> ThreadStatus {
1300        if self.running_turn.is_some() {
1301            ThreadStatus::Generating
1302        } else {
1303            ThreadStatus::Idle
1304        }
1305    }
1306
1307    pub fn had_error(&self) -> bool {
1308        self.had_error
1309    }
1310
1311    pub fn is_waiting_for_confirmation(&self) -> bool {
1312        for entry in self.entries.iter().rev() {
1313            match entry {
1314                AgentThreadEntry::UserMessage(_) => return false,
1315                AgentThreadEntry::ToolCall(ToolCall {
1316                    status: ToolCallStatus::WaitingForConfirmation { .. },
1317                    ..
1318                }) => return true,
1319                AgentThreadEntry::ToolCall(_)
1320                | AgentThreadEntry::AssistantMessage(_)
1321                | AgentThreadEntry::CompletedPlan(_) => {}
1322            }
1323        }
1324        false
1325    }
1326
1327    pub fn token_usage(&self) -> Option<&TokenUsage> {
1328        self.token_usage.as_ref()
1329    }
1330
1331    pub fn has_pending_edit_tool_calls(&self) -> bool {
1332        for entry in self.entries.iter().rev() {
1333            match entry {
1334                AgentThreadEntry::UserMessage(_) => return false,
1335                AgentThreadEntry::ToolCall(
1336                    call @ ToolCall {
1337                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1338                        ..
1339                    },
1340                ) if call.diffs().next().is_some() => {
1341                    return true;
1342                }
1343                AgentThreadEntry::ToolCall(_)
1344                | AgentThreadEntry::AssistantMessage(_)
1345                | AgentThreadEntry::CompletedPlan(_) => {}
1346            }
1347        }
1348
1349        false
1350    }
1351
1352    pub fn has_in_progress_tool_calls(&self) -> bool {
1353        for entry in self.entries.iter().rev() {
1354            match entry {
1355                AgentThreadEntry::UserMessage(_) => return false,
1356                AgentThreadEntry::ToolCall(ToolCall {
1357                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1358                    ..
1359                }) => {
1360                    return true;
1361                }
1362                AgentThreadEntry::ToolCall(_)
1363                | AgentThreadEntry::AssistantMessage(_)
1364                | AgentThreadEntry::CompletedPlan(_) => {}
1365            }
1366        }
1367
1368        false
1369    }
1370
1371    pub fn used_tools_since_last_user_message(&self) -> bool {
1372        for entry in self.entries.iter().rev() {
1373            match entry {
1374                AgentThreadEntry::UserMessage(..) => return false,
1375                AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1376                    continue;
1377                }
1378                AgentThreadEntry::ToolCall(..) => return true,
1379            }
1380        }
1381
1382        false
1383    }
1384
1385    pub fn handle_session_update(
1386        &mut self,
1387        update: acp::SessionUpdate,
1388        cx: &mut Context<Self>,
1389    ) -> Result<(), acp::Error> {
1390        match update {
1391            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1392                self.push_user_content_block(None, content, cx);
1393            }
1394            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1395                self.push_assistant_content_block(content, false, cx);
1396            }
1397            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1398                self.push_assistant_content_block(content, true, cx);
1399            }
1400            acp::SessionUpdate::ToolCall(tool_call) => {
1401                self.upsert_tool_call(tool_call, cx)?;
1402            }
1403            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1404                self.update_tool_call(tool_call_update, cx)?;
1405            }
1406            acp::SessionUpdate::Plan(plan) => {
1407                self.update_plan(plan, cx);
1408            }
1409            acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1410                if let acp::MaybeUndefined::Value(title) = info_update.title {
1411                    let had_provisional = self.provisional_title.take().is_some();
1412                    let title: SharedString = title.into();
1413                    if self.title.as_ref() != Some(&title) {
1414                        self.title = Some(title);
1415                        cx.emit(AcpThreadEvent::TitleUpdated);
1416                    } else if had_provisional {
1417                        cx.emit(AcpThreadEvent::TitleUpdated);
1418                    }
1419                }
1420            }
1421            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1422                available_commands,
1423                ..
1424            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1425            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1426                current_mode_id,
1427                ..
1428            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1429            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1430                config_options,
1431                ..
1432            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1433            _ => {}
1434        }
1435        Ok(())
1436    }
1437
1438    pub fn push_user_content_block(
1439        &mut self,
1440        message_id: Option<UserMessageId>,
1441        chunk: acp::ContentBlock,
1442        cx: &mut Context<Self>,
1443    ) {
1444        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1445    }
1446
1447    pub fn push_user_content_block_with_indent(
1448        &mut self,
1449        message_id: Option<UserMessageId>,
1450        chunk: acp::ContentBlock,
1451        indented: bool,
1452        cx: &mut Context<Self>,
1453    ) {
1454        let language_registry = self.project.read(cx).languages().clone();
1455        let path_style = self.project.read(cx).path_style(cx);
1456        let entries_len = self.entries.len();
1457
1458        if let Some(last_entry) = self.entries.last_mut()
1459            && let AgentThreadEntry::UserMessage(UserMessage {
1460                id,
1461                content,
1462                chunks,
1463                indented: existing_indented,
1464                ..
1465            }) = last_entry
1466            && *existing_indented == indented
1467        {
1468            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1469            *id = message_id.or(id.take());
1470            content.append(chunk.clone(), &language_registry, path_style, cx);
1471            chunks.push(chunk);
1472            let idx = entries_len - 1;
1473            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1474        } else {
1475            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1476            self.push_entry(
1477                AgentThreadEntry::UserMessage(UserMessage {
1478                    id: message_id,
1479                    content,
1480                    chunks: vec![chunk],
1481                    checkpoint: None,
1482                    indented,
1483                }),
1484                cx,
1485            );
1486        }
1487    }
1488
1489    pub fn push_assistant_content_block(
1490        &mut self,
1491        chunk: acp::ContentBlock,
1492        is_thought: bool,
1493        cx: &mut Context<Self>,
1494    ) {
1495        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1496    }
1497
1498    pub fn push_assistant_content_block_with_indent(
1499        &mut self,
1500        chunk: acp::ContentBlock,
1501        is_thought: bool,
1502        indented: bool,
1503        cx: &mut Context<Self>,
1504    ) {
1505        let path_style = self.project.read(cx).path_style(cx);
1506
1507        // For text chunks going to an existing Markdown block, buffer for smooth
1508        // streaming instead of appending all at once which may feel more choppy.
1509        if let acp::ContentBlock::Text(text_content) = &chunk {
1510            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1511                let entries_len = self.entries.len();
1512                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1513                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1514                return;
1515            }
1516        }
1517
1518        let language_registry = self.project.read(cx).languages().clone();
1519        let entries_len = self.entries.len();
1520        if let Some(last_entry) = self.entries.last_mut()
1521            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1522                chunks,
1523                indented: existing_indented,
1524                is_subagent_output: _,
1525            }) = last_entry
1526            && *existing_indented == indented
1527        {
1528            let idx = entries_len - 1;
1529            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1530            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1531            match (chunks.last_mut(), is_thought) {
1532                (Some(AssistantMessageChunk::Message { block }), false)
1533                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1534                    block.append(chunk, &language_registry, path_style, cx)
1535                }
1536                _ => {
1537                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1538                    if is_thought {
1539                        chunks.push(AssistantMessageChunk::Thought { block })
1540                    } else {
1541                        chunks.push(AssistantMessageChunk::Message { block })
1542                    }
1543                }
1544            }
1545        } else {
1546            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1547            let chunk = if is_thought {
1548                AssistantMessageChunk::Thought { block }
1549            } else {
1550                AssistantMessageChunk::Message { block }
1551            };
1552
1553            self.push_entry(
1554                AgentThreadEntry::AssistantMessage(AssistantMessage {
1555                    chunks: vec![chunk],
1556                    indented,
1557                    is_subagent_output: false,
1558                }),
1559                cx,
1560            );
1561        }
1562    }
1563
1564    fn streaming_markdown_target(
1565        &self,
1566        is_thought: bool,
1567        indented: bool,
1568    ) -> Option<Entity<Markdown>> {
1569        let last_entry = self.entries.last()?;
1570        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1571            chunks,
1572            indented: existing_indented,
1573            ..
1574        }) = last_entry
1575            && *existing_indented == indented
1576            && let [.., chunk] = chunks.as_slice()
1577        {
1578            match (chunk, is_thought) {
1579                (
1580                    AssistantMessageChunk::Message {
1581                        block: ContentBlock::Markdown { markdown },
1582                    },
1583                    false,
1584                )
1585                | (
1586                    AssistantMessageChunk::Thought {
1587                        block: ContentBlock::Markdown { markdown },
1588                    },
1589                    true,
1590                ) => Some(markdown.clone()),
1591                _ => None,
1592            }
1593        } else {
1594            None
1595        }
1596    }
1597
1598    /// Add text to the streaming buffer. If the target changed (e.g. switching
1599    /// from thoughts to message text), flush the old buffer first.
1600    fn buffer_streaming_text(
1601        &mut self,
1602        markdown: &Entity<Markdown>,
1603        text: String,
1604        cx: &mut Context<Self>,
1605    ) {
1606        if let Some(buffer) = &mut self.streaming_text_buffer {
1607            if buffer.target.entity_id() == markdown.entity_id() {
1608                buffer.pending.push_str(&text);
1609
1610                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1611                    / StreamingTextBuffer::REVEAL_TARGET
1612                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1613                    .ceil() as usize;
1614                return;
1615            }
1616            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1617        }
1618
1619        let target = markdown.clone();
1620        let _reveal_task = self.start_streaming_reveal(cx);
1621        let pending_len = text.len();
1622        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1623            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1624            .ceil() as usize;
1625        self.streaming_text_buffer = Some(StreamingTextBuffer {
1626            pending: text,
1627            bytes_to_reveal_per_tick: bytes_to_reveal,
1628            target,
1629            _reveal_task,
1630        });
1631    }
1632
1633    /// Flush all buffered streaming text into the Markdown entity immediately.
1634    fn flush_streaming_text(
1635        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1636        cx: &mut Context<Self>,
1637    ) {
1638        if let Some(buffer) = streaming_text_buffer.take() {
1639            if !buffer.pending.is_empty() {
1640                buffer
1641                    .target
1642                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1643            }
1644        }
1645    }
1646
1647    /// Spawns a foreground task that periodically drains
1648    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1649    /// producing smooth, continuous text output.
1650    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1651        cx.spawn(async move |this, cx| {
1652            loop {
1653                cx.background_executor()
1654                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1655                    .await;
1656
1657                let should_continue = this
1658                    .update(cx, |this, cx| {
1659                        let Some(buffer) = &mut this.streaming_text_buffer else {
1660                            return false;
1661                        };
1662
1663                        if buffer.pending.is_empty() {
1664                            return true;
1665                        }
1666
1667                        let pending_len = buffer.pending.len();
1668
1669                        let byte_boundary = buffer
1670                            .pending
1671                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1672                            .min(pending_len);
1673
1674                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1675                            markdown.append(&buffer.pending[..byte_boundary], cx);
1676                            buffer.pending.drain(..byte_boundary);
1677                        });
1678
1679                        true
1680                    })
1681                    .unwrap_or(false);
1682
1683                if !should_continue {
1684                    break;
1685                }
1686            }
1687        })
1688    }
1689
1690    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1691        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1692        self.entries.push(entry);
1693        cx.emit(AcpThreadEvent::NewEntry);
1694    }
1695
1696    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1697        self.connection.set_title(&self.session_id, cx).is_some()
1698    }
1699
1700    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1701        let had_provisional = self.provisional_title.take().is_some();
1702        if self.title.as_ref() != Some(&title) {
1703            self.title = Some(title.clone());
1704            cx.emit(AcpThreadEvent::TitleUpdated);
1705            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1706                return set_title.run(title, cx);
1707            }
1708        } else if had_provisional {
1709            cx.emit(AcpThreadEvent::TitleUpdated);
1710        }
1711        Task::ready(Ok(()))
1712    }
1713
1714    /// Sets a provisional display title without propagating back to the
1715    /// underlying agent connection. This is used for quick preview titles
1716    /// (e.g. first 20 chars of the user message) that should be shown
1717    /// immediately but replaced once the LLM generates a proper title via
1718    /// `set_title`.
1719    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1720        self.provisional_title = Some(title);
1721        cx.emit(AcpThreadEvent::TitleUpdated);
1722    }
1723
1724    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1725        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1726    }
1727
1728    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1729        self.token_usage = usage;
1730        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1731    }
1732
1733    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1734        cx.emit(AcpThreadEvent::Retry(status));
1735    }
1736
1737    pub fn update_tool_call(
1738        &mut self,
1739        update: impl Into<ToolCallUpdate>,
1740        cx: &mut Context<Self>,
1741    ) -> Result<()> {
1742        let update = update.into();
1743        let languages = self.project.read(cx).languages().clone();
1744        let path_style = self.project.read(cx).path_style(cx);
1745
1746        let ix = match self.index_for_tool_call(update.id()) {
1747            Some(ix) => ix,
1748            None => {
1749                // Tool call not found - create a failed tool call entry
1750                let failed_tool_call = ToolCall {
1751                    id: update.id().clone(),
1752                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1753                    kind: acp::ToolKind::Fetch,
1754                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1755                        "Tool call not found".into(),
1756                        &languages,
1757                        path_style,
1758                        cx,
1759                    ))],
1760                    status: ToolCallStatus::Failed,
1761                    locations: Vec::new(),
1762                    resolved_locations: Vec::new(),
1763                    raw_input: None,
1764                    raw_input_markdown: None,
1765                    raw_output: None,
1766                    tool_name: None,
1767                    subagent_session_info: None,
1768                };
1769                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1770                return Ok(());
1771            }
1772        };
1773        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1774            unreachable!()
1775        };
1776
1777        match update {
1778            ToolCallUpdate::UpdateFields(update) => {
1779                let location_updated = update.fields.locations.is_some();
1780                call.update_fields(
1781                    update.fields,
1782                    update.meta,
1783                    languages,
1784                    path_style,
1785                    &self.terminals,
1786                    cx,
1787                )?;
1788                if location_updated {
1789                    self.resolve_locations(update.tool_call_id, cx);
1790                }
1791            }
1792            ToolCallUpdate::UpdateDiff(update) => {
1793                call.content.clear();
1794                call.content.push(ToolCallContent::Diff(update.diff));
1795            }
1796            ToolCallUpdate::UpdateTerminal(update) => {
1797                call.content.clear();
1798                call.content
1799                    .push(ToolCallContent::Terminal(update.terminal));
1800            }
1801        }
1802
1803        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1804
1805        Ok(())
1806    }
1807
1808    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1809    pub fn upsert_tool_call(
1810        &mut self,
1811        tool_call: acp::ToolCall,
1812        cx: &mut Context<Self>,
1813    ) -> Result<(), acp::Error> {
1814        let status = tool_call.status.into();
1815        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1816    }
1817
1818    /// Fails if id does not match an existing entry.
1819    pub fn upsert_tool_call_inner(
1820        &mut self,
1821        update: acp::ToolCallUpdate,
1822        status: ToolCallStatus,
1823        cx: &mut Context<Self>,
1824    ) -> Result<(), acp::Error> {
1825        let language_registry = self.project.read(cx).languages().clone();
1826        let path_style = self.project.read(cx).path_style(cx);
1827        let id = update.tool_call_id.clone();
1828
1829        let agent_telemetry_id = self.connection().telemetry_id();
1830        let session = self.session_id();
1831        let parent_session_id = self.parent_session_id();
1832        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1833            let status = if matches!(status, ToolCallStatus::Completed) {
1834                "completed"
1835            } else {
1836                "failed"
1837            };
1838            telemetry::event!(
1839                "Agent Tool Call Completed",
1840                agent_telemetry_id,
1841                session,
1842                parent_session_id,
1843                status
1844            );
1845        }
1846
1847        if let Some(ix) = self.index_for_tool_call(&id) {
1848            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1849                unreachable!()
1850            };
1851
1852            call.update_fields(
1853                update.fields,
1854                update.meta,
1855                language_registry,
1856                path_style,
1857                &self.terminals,
1858                cx,
1859            )?;
1860            call.status = status;
1861
1862            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1863        } else {
1864            let call = ToolCall::from_acp(
1865                update.try_into()?,
1866                status,
1867                language_registry,
1868                self.project.read(cx).path_style(cx),
1869                &self.terminals,
1870                cx,
1871            )?;
1872            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1873        };
1874
1875        self.resolve_locations(id, cx);
1876        Ok(())
1877    }
1878
1879    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1880        self.entries
1881            .iter()
1882            .enumerate()
1883            .rev()
1884            .find_map(|(index, entry)| {
1885                if let AgentThreadEntry::ToolCall(tool_call) = entry
1886                    && &tool_call.id == id
1887                {
1888                    Some(index)
1889                } else {
1890                    None
1891                }
1892            })
1893    }
1894
1895    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1896        // The tool call we are looking for is typically the last one, or very close to the end.
1897        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1898        self.entries
1899            .iter_mut()
1900            .enumerate()
1901            .rev()
1902            .find_map(|(index, tool_call)| {
1903                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1904                    && &tool_call.id == id
1905                {
1906                    Some((index, tool_call))
1907                } else {
1908                    None
1909                }
1910            })
1911    }
1912
1913    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1914        self.entries
1915            .iter()
1916            .enumerate()
1917            .rev()
1918            .find_map(|(index, tool_call)| {
1919                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1920                    && &tool_call.id == id
1921                {
1922                    Some((index, tool_call))
1923                } else {
1924                    None
1925                }
1926            })
1927    }
1928
1929    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1930        self.entries.iter().find_map(|entry| match entry {
1931            AgentThreadEntry::ToolCall(tool_call) => {
1932                if let Some(subagent_session_info) = &tool_call.subagent_session_info
1933                    && &subagent_session_info.session_id == session_id
1934                {
1935                    Some(tool_call)
1936                } else {
1937                    None
1938                }
1939            }
1940            _ => None,
1941        })
1942    }
1943
1944    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1945        let project = self.project.clone();
1946        let should_update_agent_location = self.parent_session_id.is_none();
1947        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1948            return;
1949        };
1950        let task = tool_call.resolve_locations(project, cx);
1951        cx.spawn(async move |this, cx| {
1952            let resolved_locations = task.await;
1953
1954            this.update(cx, |this, cx| {
1955                let project = this.project.clone();
1956
1957                for location in resolved_locations.iter().flatten() {
1958                    this.shared_buffers
1959                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1960                }
1961                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1962                    return;
1963                };
1964
1965                if let Some(Some(location)) = resolved_locations.last() {
1966                    project.update(cx, |project, cx| {
1967                        let should_ignore = if let Some(agent_location) = project
1968                            .agent_location()
1969                            .filter(|agent_location| agent_location.buffer == location.buffer)
1970                        {
1971                            let snapshot = location.buffer.read(cx).snapshot();
1972                            let old_position = agent_location.position.to_point(&snapshot);
1973                            let new_position = location.position.to_point(&snapshot);
1974
1975                            // ignore this so that when we get updates from the edit tool
1976                            // the position doesn't reset to the startof line
1977                            old_position.row == new_position.row
1978                                && old_position.column > new_position.column
1979                        } else {
1980                            false
1981                        };
1982                        if !should_ignore && should_update_agent_location {
1983                            project.set_agent_location(Some(location.into()), cx);
1984                        }
1985                    });
1986                }
1987
1988                let resolved_locations = resolved_locations
1989                    .iter()
1990                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1991                    .collect::<Vec<_>>();
1992
1993                if tool_call.resolved_locations != resolved_locations {
1994                    tool_call.resolved_locations = resolved_locations;
1995                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1996                }
1997            })
1998        })
1999        .detach();
2000    }
2001
2002    pub fn request_tool_call_authorization(
2003        &mut self,
2004        tool_call: acp::ToolCallUpdate,
2005        options: PermissionOptions,
2006        cx: &mut Context<Self>,
2007    ) -> Result<Task<RequestPermissionOutcome>> {
2008        let (tx, rx) = oneshot::channel();
2009
2010        let status = ToolCallStatus::WaitingForConfirmation {
2011            options,
2012            respond_tx: tx,
2013        };
2014
2015        let tool_call_id = tool_call.tool_call_id.clone();
2016        self.upsert_tool_call_inner(tool_call, status, cx)?;
2017        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2018            tool_call_id.clone(),
2019        ));
2020
2021        Ok(cx.spawn(async move |this, cx| {
2022            let outcome = match rx.await {
2023                Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2024                Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2025            };
2026            this.update(cx, |_this, cx| {
2027                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2028            })
2029            .ok();
2030            outcome
2031        }))
2032    }
2033
2034    pub fn authorize_tool_call(
2035        &mut self,
2036        id: acp::ToolCallId,
2037        outcome: SelectedPermissionOutcome,
2038        cx: &mut Context<Self>,
2039    ) {
2040        let Some((ix, call)) = self.tool_call_mut(&id) else {
2041            return;
2042        };
2043
2044        let new_status = match outcome.option_kind {
2045            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2046                ToolCallStatus::Rejected
2047            }
2048            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2049                ToolCallStatus::InProgress
2050            }
2051            _ => ToolCallStatus::InProgress,
2052        };
2053
2054        let curr_status = mem::replace(&mut call.status, new_status);
2055
2056        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2057            respond_tx.send(outcome).log_err();
2058        } else if cfg!(debug_assertions) {
2059            panic!("tried to authorize an already authorized tool call");
2060        }
2061
2062        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2063    }
2064
2065    pub fn plan(&self) -> &Plan {
2066        &self.plan
2067    }
2068
2069    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2070        let new_entries_len = request.entries.len();
2071        let mut new_entries = request.entries.into_iter();
2072
2073        // Reuse existing markdown to prevent flickering
2074        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2075            let PlanEntry {
2076                content,
2077                priority,
2078                status,
2079            } = old;
2080            content.update(cx, |old, cx| {
2081                old.replace(new.content, cx);
2082            });
2083            *priority = new.priority;
2084            *status = new.status;
2085        }
2086        for new in new_entries {
2087            self.plan.entries.push(PlanEntry::from_acp(new, cx))
2088        }
2089        self.plan.entries.truncate(new_entries_len);
2090
2091        cx.notify();
2092    }
2093
2094    pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2095        if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2096            let completed_entries = std::mem::take(&mut self.plan.entries);
2097            self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2098        }
2099    }
2100
2101    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2102        self.plan
2103            .entries
2104            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2105        cx.notify();
2106    }
2107
2108    pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2109        self.plan.entries.clear();
2110        cx.notify();
2111    }
2112
2113    #[cfg(any(test, feature = "test-support"))]
2114    pub fn send_raw(
2115        &mut self,
2116        message: &str,
2117        cx: &mut Context<Self>,
2118    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2119        self.send(vec![message.into()], cx)
2120    }
2121
2122    pub fn send(
2123        &mut self,
2124        message: Vec<acp::ContentBlock>,
2125        cx: &mut Context<Self>,
2126    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2127        let block = ContentBlock::new_combined(
2128            message.clone(),
2129            self.project.read(cx).languages().clone(),
2130            self.project.read(cx).path_style(cx),
2131            cx,
2132        );
2133        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2134        let git_store = self.project.read(cx).git_store().clone();
2135
2136        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2137            Some(UserMessageId::new())
2138        } else {
2139            None
2140        };
2141
2142        self.run_turn(cx, async move |this, cx| {
2143            this.update(cx, |this, cx| {
2144                this.push_entry(
2145                    AgentThreadEntry::UserMessage(UserMessage {
2146                        id: message_id.clone(),
2147                        content: block,
2148                        chunks: message,
2149                        checkpoint: None,
2150                        indented: false,
2151                    }),
2152                    cx,
2153                );
2154            })
2155            .ok();
2156
2157            let old_checkpoint = git_store
2158                .update(cx, |git, cx| git.checkpoint(cx))
2159                .await
2160                .context("failed to get old checkpoint")
2161                .log_err();
2162            this.update(cx, |this, cx| {
2163                if let Some((_ix, message)) = this.last_user_message() {
2164                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2165                        git_checkpoint,
2166                        show: false,
2167                    });
2168                }
2169                this.connection.prompt(message_id, request, cx)
2170            })?
2171            .await
2172        })
2173    }
2174
2175    pub fn can_retry(&self, cx: &App) -> bool {
2176        self.connection.retry(&self.session_id, cx).is_some()
2177    }
2178
2179    pub fn retry(
2180        &mut self,
2181        cx: &mut Context<Self>,
2182    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2183        self.run_turn(cx, async move |this, cx| {
2184            this.update(cx, |this, cx| {
2185                this.connection
2186                    .retry(&this.session_id, cx)
2187                    .map(|retry| retry.run(cx))
2188            })?
2189            .context("retrying a session is not supported")?
2190            .await
2191        })
2192    }
2193
2194    fn run_turn(
2195        &mut self,
2196        cx: &mut Context<Self>,
2197        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2198    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2199        self.clear_completed_plan_entries(cx);
2200        self.had_error = false;
2201
2202        let (tx, rx) = oneshot::channel();
2203        let cancel_task = self.cancel(cx);
2204
2205        self.turn_id += 1;
2206        let turn_id = self.turn_id;
2207        self.running_turn = Some(RunningTurn {
2208            id: turn_id,
2209            send_task: cx.spawn(async move |this, cx| {
2210                cancel_task.await;
2211                tx.send(f(this, cx).await).ok();
2212            }),
2213        });
2214
2215        cx.spawn(async move |this, cx| {
2216            let response = rx.await;
2217
2218            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2219                .await?;
2220
2221            this.update(cx, |this, cx| {
2222                if this.parent_session_id.is_none() {
2223                    this.project
2224                        .update(cx, |project, cx| project.set_agent_location(None, cx));
2225                }
2226                let Ok(response) = response else {
2227                    // tx dropped, just return
2228                    return Ok(None);
2229                };
2230
2231                let is_same_turn = this
2232                    .running_turn
2233                    .as_ref()
2234                    .is_some_and(|turn| turn_id == turn.id);
2235
2236                // If the user submitted a follow up message, running_turn might
2237                // already point to a different turn. Therefore we only want to
2238                // take the task if it's the same turn.
2239                if is_same_turn {
2240                    this.running_turn.take();
2241                }
2242
2243                match response {
2244                    Ok(r) => {
2245                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2246
2247                        if r.stop_reason == acp::StopReason::MaxTokens {
2248                            this.had_error = true;
2249                            cx.emit(AcpThreadEvent::Error);
2250                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2251
2252                            let exceeded_max_output_tokens =
2253                                this.token_usage.as_ref().is_some_and(|u| {
2254                                    u.max_output_tokens
2255                                        .is_some_and(|max| u.output_tokens >= max)
2256                                });
2257
2258                            let message = if exceeded_max_output_tokens {
2259                                log::error!(
2260                                    "Max output tokens reached. Usage: {:?}",
2261                                    this.token_usage
2262                                );
2263                                "Maximum output tokens reached"
2264                            } else {
2265                                log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2266                                "Maximum tokens reached"
2267                            };
2268                            return Err(anyhow!(message));
2269                        }
2270
2271                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2272                        if canceled {
2273                            this.mark_pending_tools_as_canceled();
2274                        }
2275
2276                        if !canceled {
2277                            this.snapshot_completed_plan(cx);
2278                        }
2279
2280                        // Handle refusal - distinguish between user prompt and tool call refusals
2281                        if let acp::StopReason::Refusal = r.stop_reason {
2282                            this.had_error = true;
2283                            if let Some((user_msg_ix, _)) = this.last_user_message() {
2284                                // Check if there's a completed tool call with results after the last user message
2285                                // This indicates the refusal is in response to tool output, not the user's prompt
2286                                let has_completed_tool_call_after_user_msg =
2287                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2288                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2289                                            // Check if the tool call has completed and has output
2290                                            matches!(tool_call.status, ToolCallStatus::Completed)
2291                                                && tool_call.raw_output.is_some()
2292                                        } else {
2293                                            false
2294                                        }
2295                                    });
2296
2297                                if has_completed_tool_call_after_user_msg {
2298                                    // Refusal is due to tool output - don't truncate, just notify
2299                                    // The model refused based on what the tool returned
2300                                    cx.emit(AcpThreadEvent::Refusal);
2301                                } else {
2302                                    // User prompt was refused - truncate back to before the user message
2303                                    let range = user_msg_ix..this.entries.len();
2304                                    if range.start < range.end {
2305                                        this.entries.truncate(user_msg_ix);
2306                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2307                                    }
2308                                    cx.emit(AcpThreadEvent::Refusal);
2309                                }
2310                            } else {
2311                                // No user message found, treat as general refusal
2312                                cx.emit(AcpThreadEvent::Refusal);
2313                            }
2314                        }
2315
2316                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2317                        Ok(Some(r))
2318                    }
2319                    Err(e) => {
2320                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2321
2322                        this.had_error = true;
2323                        cx.emit(AcpThreadEvent::Error);
2324                        log::error!("Error in run turn: {:?}", e);
2325                        Err(e)
2326                    }
2327                }
2328            })?
2329        })
2330        .boxed()
2331    }
2332
2333    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2334        let Some(turn) = self.running_turn.take() else {
2335            return Task::ready(());
2336        };
2337        self.connection.cancel(&self.session_id, cx);
2338
2339        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2340        self.mark_pending_tools_as_canceled();
2341
2342        // Wait for the send task to complete
2343        cx.background_spawn(turn.send_task)
2344    }
2345
2346    fn mark_pending_tools_as_canceled(&mut self) {
2347        for entry in self.entries.iter_mut() {
2348            if let AgentThreadEntry::ToolCall(call) = entry {
2349                let cancel = matches!(
2350                    call.status,
2351                    ToolCallStatus::Pending
2352                        | ToolCallStatus::WaitingForConfirmation { .. }
2353                        | ToolCallStatus::InProgress
2354                );
2355
2356                if cancel {
2357                    call.status = ToolCallStatus::Canceled;
2358                }
2359            }
2360        }
2361    }
2362
2363    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2364    pub fn restore_checkpoint(
2365        &mut self,
2366        id: UserMessageId,
2367        cx: &mut Context<Self>,
2368    ) -> Task<Result<()>> {
2369        let Some((_, message)) = self.user_message_mut(&id) else {
2370            return Task::ready(Err(anyhow!("message not found")));
2371        };
2372
2373        let checkpoint = message
2374            .checkpoint
2375            .as_ref()
2376            .map(|c| c.git_checkpoint.clone());
2377
2378        // Cancel any in-progress generation before restoring
2379        let cancel_task = self.cancel(cx);
2380        let rewind = self.rewind(id.clone(), cx);
2381        let git_store = self.project.read(cx).git_store().clone();
2382
2383        cx.spawn(async move |_, cx| {
2384            cancel_task.await;
2385            rewind.await?;
2386            if let Some(checkpoint) = checkpoint {
2387                git_store
2388                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2389                    .await?;
2390            }
2391
2392            Ok(())
2393        })
2394    }
2395
2396    /// Rewinds this thread to before the entry at `index`, removing it and all
2397    /// subsequent entries while rejecting any action_log changes made from that point.
2398    /// Unlike `restore_checkpoint`, this method does not restore from git.
2399    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2400        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2401            return Task::ready(Err(anyhow!("not supported")));
2402        };
2403
2404        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2405        let telemetry = ActionLogTelemetry::from(&*self);
2406        cx.spawn(async move |this, cx| {
2407            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2408            this.update(cx, |this, cx| {
2409                if let Some((ix, _)) = this.user_message_mut(&id) {
2410                    // Collect all terminals from entries that will be removed
2411                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2412                        .iter()
2413                        .flat_map(|entry| entry.terminals())
2414                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2415                        .collect();
2416
2417                    let range = ix..this.entries.len();
2418                    this.entries.truncate(ix);
2419                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2420
2421                    // Kill and remove the terminals
2422                    for terminal_id in terminals_to_remove {
2423                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2424                            terminal.update(cx, |terminal, cx| {
2425                                terminal.kill(cx);
2426                            });
2427                        }
2428                    }
2429                }
2430                this.action_log().update(cx, |action_log, cx| {
2431                    action_log.reject_all_edits(Some(telemetry), cx)
2432                })
2433            })?
2434            .await;
2435            Ok(())
2436        })
2437    }
2438
2439    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2440        let git_store = self.project.read(cx).git_store().clone();
2441
2442        let Some((_, message)) = self.last_user_message() else {
2443            return Task::ready(Ok(()));
2444        };
2445        let Some(user_message_id) = message.id.clone() else {
2446            return Task::ready(Ok(()));
2447        };
2448        let Some(checkpoint) = message.checkpoint.as_ref() else {
2449            return Task::ready(Ok(()));
2450        };
2451        let old_checkpoint = checkpoint.git_checkpoint.clone();
2452
2453        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2454        cx.spawn(async move |this, cx| {
2455            let Some(new_checkpoint) = new_checkpoint
2456                .await
2457                .context("failed to get new checkpoint")
2458                .log_err()
2459            else {
2460                return Ok(());
2461            };
2462
2463            let equal = git_store
2464                .update(cx, |git, cx| {
2465                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2466                })
2467                .await
2468                .unwrap_or(true);
2469
2470            this.update(cx, |this, cx| {
2471                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2472                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2473                        checkpoint.show = !equal;
2474                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2475                    }
2476                }
2477            })?;
2478
2479            Ok(())
2480        })
2481    }
2482
2483    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2484        self.entries
2485            .iter_mut()
2486            .enumerate()
2487            .rev()
2488            .find_map(|(ix, entry)| {
2489                if let AgentThreadEntry::UserMessage(message) = entry {
2490                    Some((ix, message))
2491                } else {
2492                    None
2493                }
2494            })
2495    }
2496
2497    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2498        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2499            if let AgentThreadEntry::UserMessage(message) = entry {
2500                if message.id.as_ref() == Some(id) {
2501                    Some((ix, message))
2502                } else {
2503                    None
2504                }
2505            } else {
2506                None
2507            }
2508        })
2509    }
2510
2511    pub fn read_text_file(
2512        &self,
2513        path: PathBuf,
2514        line: Option<u32>,
2515        limit: Option<u32>,
2516        reuse_shared_snapshot: bool,
2517        cx: &mut Context<Self>,
2518    ) -> Task<Result<String, acp::Error>> {
2519        // Args are 1-based, move to 0-based
2520        let line = line.unwrap_or_default().saturating_sub(1);
2521        let limit = limit.unwrap_or(u32::MAX);
2522        let project = self.project.clone();
2523        let action_log = self.action_log.clone();
2524        let should_update_agent_location = self.parent_session_id.is_none();
2525        cx.spawn(async move |this, cx| {
2526            let load = project.update(cx, |project, cx| {
2527                let path = project
2528                    .project_path_for_absolute_path(&path, cx)
2529                    .ok_or_else(|| {
2530                        acp::Error::resource_not_found(Some(path.display().to_string()))
2531                    })?;
2532                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2533            })?;
2534
2535            let buffer = load.await?;
2536
2537            let snapshot = if reuse_shared_snapshot {
2538                this.read_with(cx, |this, _| {
2539                    this.shared_buffers.get(&buffer.clone()).cloned()
2540                })
2541                .log_err()
2542                .flatten()
2543            } else {
2544                None
2545            };
2546
2547            let snapshot = if let Some(snapshot) = snapshot {
2548                snapshot
2549            } else {
2550                action_log.update(cx, |action_log, cx| {
2551                    action_log.buffer_read(buffer.clone(), cx);
2552                });
2553
2554                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2555                this.update(cx, |this, _| {
2556                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2557                })?;
2558                snapshot
2559            };
2560
2561            let max_point = snapshot.max_point();
2562            let start_position = Point::new(line, 0);
2563
2564            if start_position > max_point {
2565                return Err(acp::Error::invalid_params().data(format!(
2566                    "Attempting to read beyond the end of the file, line {}:{}",
2567                    max_point.row + 1,
2568                    max_point.column
2569                )));
2570            }
2571
2572            let start = snapshot.anchor_before(start_position);
2573            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2574
2575            if should_update_agent_location {
2576                project.update(cx, |project, cx| {
2577                    project.set_agent_location(
2578                        Some(AgentLocation {
2579                            buffer: buffer.downgrade(),
2580                            position: start,
2581                        }),
2582                        cx,
2583                    );
2584                });
2585            }
2586
2587            Ok(snapshot.text_for_range(start..end).collect::<String>())
2588        })
2589    }
2590
2591    pub fn write_text_file(
2592        &self,
2593        path: PathBuf,
2594        content: String,
2595        cx: &mut Context<Self>,
2596    ) -> Task<Result<()>> {
2597        let project = self.project.clone();
2598        let action_log = self.action_log.clone();
2599        let should_update_agent_location = self.parent_session_id.is_none();
2600        cx.spawn(async move |this, cx| {
2601            let load = project.update(cx, |project, cx| {
2602                let path = project
2603                    .project_path_for_absolute_path(&path, cx)
2604                    .context("invalid path")?;
2605                anyhow::Ok(project.open_buffer(path, cx))
2606            });
2607            let buffer = load?.await?;
2608            let snapshot = this.update(cx, |this, cx| {
2609                this.shared_buffers
2610                    .get(&buffer)
2611                    .cloned()
2612                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2613            })?;
2614            let edits = cx
2615                .background_executor()
2616                .spawn(async move {
2617                    let old_text = snapshot.text();
2618                    text_diff(old_text.as_str(), &content)
2619                        .into_iter()
2620                        .map(|(range, replacement)| {
2621                            (snapshot.anchor_range_around(range), replacement)
2622                        })
2623                        .collect::<Vec<_>>()
2624                })
2625                .await;
2626
2627            if should_update_agent_location {
2628                project.update(cx, |project, cx| {
2629                    project.set_agent_location(
2630                        Some(AgentLocation {
2631                            buffer: buffer.downgrade(),
2632                            position: edits
2633                                .last()
2634                                .map(|(range, _)| range.end)
2635                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2636                        }),
2637                        cx,
2638                    );
2639                });
2640            }
2641
2642            let format_on_save = cx.update(|cx| {
2643                action_log.update(cx, |action_log, cx| {
2644                    action_log.buffer_read(buffer.clone(), cx);
2645                });
2646
2647                let format_on_save = buffer.update(cx, |buffer, cx| {
2648                    buffer.edit(edits, None, cx);
2649
2650                    let settings =
2651                        language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2652
2653                    settings.format_on_save != FormatOnSave::Off
2654                });
2655                action_log.update(cx, |action_log, cx| {
2656                    action_log.buffer_edited(buffer.clone(), cx);
2657                });
2658                format_on_save
2659            });
2660
2661            if format_on_save {
2662                let format_task = project.update(cx, |project, cx| {
2663                    project.format(
2664                        HashSet::from_iter([buffer.clone()]),
2665                        LspFormatTarget::Buffers,
2666                        false,
2667                        FormatTrigger::Save,
2668                        cx,
2669                    )
2670                });
2671                format_task.await.log_err();
2672
2673                action_log.update(cx, |action_log, cx| {
2674                    action_log.buffer_edited(buffer.clone(), cx);
2675                });
2676            }
2677
2678            project
2679                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2680                .await
2681        })
2682    }
2683
2684    pub fn create_terminal(
2685        &self,
2686        command: String,
2687        args: Vec<String>,
2688        extra_env: Vec<acp::EnvVariable>,
2689        cwd: Option<PathBuf>,
2690        output_byte_limit: Option<u64>,
2691        cx: &mut Context<Self>,
2692    ) -> Task<Result<Entity<Terminal>>> {
2693        let env = match &cwd {
2694            Some(dir) => self.project.update(cx, |project, cx| {
2695                project.environment().update(cx, |env, cx| {
2696                    env.directory_environment(dir.as_path().into(), cx)
2697                })
2698            }),
2699            None => Task::ready(None).shared(),
2700        };
2701        let env = cx.spawn(async move |_, _| {
2702            let mut env = env.await.unwrap_or_default();
2703            // Disables paging for `git` and hopefully other commands
2704            env.insert("PAGER".into(), "".into());
2705            for var in extra_env {
2706                env.insert(var.name, var.value);
2707            }
2708            env
2709        });
2710
2711        let project = self.project.clone();
2712        let language_registry = project.read(cx).languages().clone();
2713        let is_windows = project.read(cx).path_style(cx).is_windows();
2714
2715        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2716        let terminal_task = cx.spawn({
2717            let terminal_id = terminal_id.clone();
2718            async move |_this, cx| {
2719                let env = env.await;
2720                let shell = project
2721                    .update(cx, |project, cx| {
2722                        project
2723                            .remote_client()
2724                            .and_then(|r| r.read(cx).default_system_shell())
2725                    })
2726                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2727                let (task_command, task_args) =
2728                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2729                        .redirect_stdin_to_dev_null()
2730                        .build(Some(command.clone()), &args);
2731                let terminal = project
2732                    .update(cx, |project, cx| {
2733                        project.create_terminal_task(
2734                            task::SpawnInTerminal {
2735                                command: Some(task_command),
2736                                args: task_args,
2737                                cwd: cwd.clone(),
2738                                env,
2739                                ..Default::default()
2740                            },
2741                            cx,
2742                        )
2743                    })
2744                    .await?;
2745
2746                anyhow::Ok(cx.new(|cx| {
2747                    Terminal::new(
2748                        terminal_id,
2749                        &format!("{} {}", command, args.join(" ")),
2750                        cwd,
2751                        output_byte_limit.map(|l| l as usize),
2752                        terminal,
2753                        language_registry,
2754                        cx,
2755                    )
2756                }))
2757            }
2758        });
2759
2760        cx.spawn(async move |this, cx| {
2761            let terminal = terminal_task.await?;
2762            this.update(cx, |this, _cx| {
2763                this.terminals.insert(terminal_id, terminal.clone());
2764                terminal
2765            })
2766        })
2767    }
2768
2769    pub fn kill_terminal(
2770        &mut self,
2771        terminal_id: acp::TerminalId,
2772        cx: &mut Context<Self>,
2773    ) -> Result<()> {
2774        self.terminals
2775            .get(&terminal_id)
2776            .context("Terminal not found")?
2777            .update(cx, |terminal, cx| {
2778                terminal.kill(cx);
2779            });
2780
2781        Ok(())
2782    }
2783
2784    pub fn release_terminal(
2785        &mut self,
2786        terminal_id: acp::TerminalId,
2787        cx: &mut Context<Self>,
2788    ) -> Result<()> {
2789        self.terminals
2790            .remove(&terminal_id)
2791            .context("Terminal not found")?
2792            .update(cx, |terminal, cx| {
2793                terminal.kill(cx);
2794            });
2795
2796        Ok(())
2797    }
2798
2799    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2800        self.terminals
2801            .get(&terminal_id)
2802            .context("Terminal not found")
2803            .cloned()
2804    }
2805
2806    pub fn to_markdown(&self, cx: &App) -> String {
2807        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2808    }
2809
2810    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2811        cx.emit(AcpThreadEvent::LoadError(error));
2812    }
2813
2814    pub fn register_terminal_created(
2815        &mut self,
2816        terminal_id: acp::TerminalId,
2817        command_label: String,
2818        working_dir: Option<PathBuf>,
2819        output_byte_limit: Option<u64>,
2820        terminal: Entity<::terminal::Terminal>,
2821        cx: &mut Context<Self>,
2822    ) -> Entity<Terminal> {
2823        let language_registry = self.project.read(cx).languages().clone();
2824
2825        let entity = cx.new(|cx| {
2826            Terminal::new(
2827                terminal_id.clone(),
2828                &command_label,
2829                working_dir.clone(),
2830                output_byte_limit.map(|l| l as usize),
2831                terminal,
2832                language_registry,
2833                cx,
2834            )
2835        });
2836        self.terminals.insert(terminal_id.clone(), entity.clone());
2837        entity
2838    }
2839
2840    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2841        for entry in self.entries.iter_mut().rev() {
2842            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2843                assistant_message.is_subagent_output = true;
2844                cx.notify();
2845                return;
2846            }
2847        }
2848    }
2849
2850    pub fn on_terminal_provider_event(
2851        &mut self,
2852        event: TerminalProviderEvent,
2853        cx: &mut Context<Self>,
2854    ) {
2855        match event {
2856            TerminalProviderEvent::Created {
2857                terminal_id,
2858                label,
2859                cwd,
2860                output_byte_limit,
2861                terminal,
2862            } => {
2863                let entity = self.register_terminal_created(
2864                    terminal_id.clone(),
2865                    label,
2866                    cwd,
2867                    output_byte_limit,
2868                    terminal,
2869                    cx,
2870                );
2871
2872                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2873                    for data in chunks.drain(..) {
2874                        entity.update(cx, |term, cx| {
2875                            term.inner().update(cx, |inner, cx| {
2876                                inner.write_output(&data, cx);
2877                            })
2878                        });
2879                    }
2880                }
2881
2882                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2883                    entity.update(cx, |_term, cx| {
2884                        cx.notify();
2885                    });
2886                }
2887
2888                cx.notify();
2889            }
2890            TerminalProviderEvent::Output { terminal_id, data } => {
2891                if let Some(entity) = self.terminals.get(&terminal_id) {
2892                    entity.update(cx, |term, cx| {
2893                        term.inner().update(cx, |inner, cx| {
2894                            inner.write_output(&data, cx);
2895                        })
2896                    });
2897                } else {
2898                    self.pending_terminal_output
2899                        .entry(terminal_id)
2900                        .or_default()
2901                        .push(data);
2902                }
2903            }
2904            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2905                if let Some(entity) = self.terminals.get(&terminal_id) {
2906                    entity.update(cx, |term, cx| {
2907                        term.inner().update(cx, |inner, cx| {
2908                            inner.breadcrumb_text = title;
2909                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2910                        })
2911                    });
2912                }
2913            }
2914            TerminalProviderEvent::Exit {
2915                terminal_id,
2916                status,
2917            } => {
2918                if let Some(entity) = self.terminals.get(&terminal_id) {
2919                    entity.update(cx, |_term, cx| {
2920                        cx.notify();
2921                    });
2922                } else {
2923                    self.pending_terminal_exit.insert(terminal_id, status);
2924                }
2925            }
2926        }
2927    }
2928}
2929
2930fn markdown_for_raw_output(
2931    raw_output: &serde_json::Value,
2932    language_registry: &Arc<LanguageRegistry>,
2933    cx: &mut App,
2934) -> Option<Entity<Markdown>> {
2935    match raw_output {
2936        serde_json::Value::Null => None,
2937        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2938            Markdown::new(
2939                value.to_string().into(),
2940                Some(language_registry.clone()),
2941                None,
2942                cx,
2943            )
2944        })),
2945        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2946            Markdown::new(
2947                value.to_string().into(),
2948                Some(language_registry.clone()),
2949                None,
2950                cx,
2951            )
2952        })),
2953        serde_json::Value::String(value) => Some(cx.new(|cx| {
2954            Markdown::new(
2955                value.clone().into(),
2956                Some(language_registry.clone()),
2957                None,
2958                cx,
2959            )
2960        })),
2961        value => Some(cx.new(|cx| {
2962            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2963
2964            Markdown::new(
2965                format!("```json\n{}\n```", pretty_json).into(),
2966                Some(language_registry.clone()),
2967                None,
2968                cx,
2969            )
2970        })),
2971    }
2972}
2973
2974#[cfg(test)]
2975mod tests {
2976    use super::*;
2977    use anyhow::anyhow;
2978    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2979    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2980    use indoc::indoc;
2981    use project::{AgentId, FakeFs, Fs};
2982    use rand::{distr, prelude::*};
2983    use serde_json::json;
2984    use settings::SettingsStore;
2985    use smol::stream::StreamExt as _;
2986    use std::{
2987        any::Any,
2988        cell::RefCell,
2989        path::Path,
2990        rc::Rc,
2991        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2992        time::Duration,
2993    };
2994    use util::{path, path_list::PathList};
2995
2996    fn init_test(cx: &mut TestAppContext) {
2997        env_logger::try_init().ok();
2998        cx.update(|cx| {
2999            let settings_store = SettingsStore::test(cx);
3000            cx.set_global(settings_store);
3001        });
3002    }
3003
3004    #[gpui::test]
3005    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3006        init_test(cx);
3007
3008        let fs = FakeFs::new(cx.executor());
3009        let project = Project::test(fs, [], cx).await;
3010        let connection = Rc::new(FakeAgentConnection::new());
3011        let thread = cx
3012            .update(|cx| {
3013                connection.new_session(
3014                    project,
3015                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3016                    cx,
3017                )
3018            })
3019            .await
3020            .unwrap();
3021
3022        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3023
3024        // Send Output BEFORE Created - should be buffered by acp_thread
3025        thread.update(cx, |thread, cx| {
3026            thread.on_terminal_provider_event(
3027                TerminalProviderEvent::Output {
3028                    terminal_id: terminal_id.clone(),
3029                    data: b"hello buffered".to_vec(),
3030                },
3031                cx,
3032            );
3033        });
3034
3035        // Create a display-only terminal and then send Created
3036        let lower = cx.new(|cx| {
3037            let builder = ::terminal::TerminalBuilder::new_display_only(
3038                ::terminal::terminal_settings::CursorShape::default(),
3039                ::terminal::terminal_settings::AlternateScroll::On,
3040                None,
3041                0,
3042                cx.background_executor(),
3043                PathStyle::local(),
3044            )
3045            .unwrap();
3046            builder.subscribe(cx)
3047        });
3048
3049        thread.update(cx, |thread, cx| {
3050            thread.on_terminal_provider_event(
3051                TerminalProviderEvent::Created {
3052                    terminal_id: terminal_id.clone(),
3053                    label: "Buffered Test".to_string(),
3054                    cwd: None,
3055                    output_byte_limit: None,
3056                    terminal: lower.clone(),
3057                },
3058                cx,
3059            );
3060        });
3061
3062        // After Created, buffered Output should have been flushed into the renderer
3063        let content = thread.read_with(cx, |thread, cx| {
3064            let term = thread.terminal(terminal_id.clone()).unwrap();
3065            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3066        });
3067
3068        assert!(
3069            content.contains("hello buffered"),
3070            "expected buffered output to render, got: {content}"
3071        );
3072    }
3073
3074    #[gpui::test]
3075    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3076        init_test(cx);
3077
3078        let fs = FakeFs::new(cx.executor());
3079        let project = Project::test(fs, [], cx).await;
3080        let connection = Rc::new(FakeAgentConnection::new());
3081        let thread = cx
3082            .update(|cx| {
3083                connection.new_session(
3084                    project,
3085                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3086                    cx,
3087                )
3088            })
3089            .await
3090            .unwrap();
3091
3092        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3093
3094        // Send Output BEFORE Created
3095        thread.update(cx, |thread, cx| {
3096            thread.on_terminal_provider_event(
3097                TerminalProviderEvent::Output {
3098                    terminal_id: terminal_id.clone(),
3099                    data: b"pre-exit data".to_vec(),
3100                },
3101                cx,
3102            );
3103        });
3104
3105        // Send Exit BEFORE Created
3106        thread.update(cx, |thread, cx| {
3107            thread.on_terminal_provider_event(
3108                TerminalProviderEvent::Exit {
3109                    terminal_id: terminal_id.clone(),
3110                    status: acp::TerminalExitStatus::new().exit_code(0),
3111                },
3112                cx,
3113            );
3114        });
3115
3116        // Now create a display-only lower-level terminal and send Created
3117        let lower = cx.new(|cx| {
3118            let builder = ::terminal::TerminalBuilder::new_display_only(
3119                ::terminal::terminal_settings::CursorShape::default(),
3120                ::terminal::terminal_settings::AlternateScroll::On,
3121                None,
3122                0,
3123                cx.background_executor(),
3124                PathStyle::local(),
3125            )
3126            .unwrap();
3127            builder.subscribe(cx)
3128        });
3129
3130        thread.update(cx, |thread, cx| {
3131            thread.on_terminal_provider_event(
3132                TerminalProviderEvent::Created {
3133                    terminal_id: terminal_id.clone(),
3134                    label: "Buffered Exit Test".to_string(),
3135                    cwd: None,
3136                    output_byte_limit: None,
3137                    terminal: lower.clone(),
3138                },
3139                cx,
3140            );
3141        });
3142
3143        // Output should be present after Created (flushed from buffer)
3144        let content = thread.read_with(cx, |thread, cx| {
3145            let term = thread.terminal(terminal_id.clone()).unwrap();
3146            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3147        });
3148
3149        assert!(
3150            content.contains("pre-exit data"),
3151            "expected pre-exit data to render, got: {content}"
3152        );
3153    }
3154
3155    /// Test that killing a terminal via Terminal::kill properly:
3156    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3157    /// 2. The underlying terminal still has the output that was written before the kill
3158    ///
3159    /// This test verifies that the fix to kill_active_task (which now also kills
3160    /// the shell process in addition to the foreground process) properly allows
3161    /// wait_for_exit to complete instead of hanging indefinitely.
3162    #[cfg(unix)]
3163    #[gpui::test]
3164    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3165        use std::collections::HashMap;
3166        use task::Shell;
3167        use util::shell_builder::ShellBuilder;
3168
3169        init_test(cx);
3170        cx.executor().allow_parking();
3171
3172        let fs = FakeFs::new(cx.executor());
3173        let project = Project::test(fs, [], cx).await;
3174        let connection = Rc::new(FakeAgentConnection::new());
3175        let thread = cx
3176            .update(|cx| {
3177                connection.new_session(
3178                    project.clone(),
3179                    PathList::new(&[Path::new(path!("/test"))]),
3180                    cx,
3181                )
3182            })
3183            .await
3184            .unwrap();
3185
3186        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3187
3188        // Create a real PTY terminal that runs a command which prints output then sleeps
3189        // We use printf instead of echo and chain with && sleep to ensure proper execution
3190        let (completion_tx, _completion_rx) = smol::channel::unbounded();
3191        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3192            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3193            &[],
3194        );
3195
3196        let builder = cx
3197            .update(|cx| {
3198                ::terminal::TerminalBuilder::new(
3199                    None,
3200                    None,
3201                    task::Shell::WithArguments {
3202                        program,
3203                        args,
3204                        title_override: None,
3205                    },
3206                    HashMap::default(),
3207                    ::terminal::terminal_settings::CursorShape::default(),
3208                    ::terminal::terminal_settings::AlternateScroll::On,
3209                    None,
3210                    vec![],
3211                    0,
3212                    false,
3213                    0,
3214                    Some(completion_tx),
3215                    cx,
3216                    vec![],
3217                    PathStyle::local(),
3218                )
3219            })
3220            .await
3221            .unwrap();
3222
3223        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3224
3225        // Create the acp_thread Terminal wrapper
3226        thread.update(cx, |thread, cx| {
3227            thread.on_terminal_provider_event(
3228                TerminalProviderEvent::Created {
3229                    terminal_id: terminal_id.clone(),
3230                    label: "printf output_before_kill && sleep 60".to_string(),
3231                    cwd: None,
3232                    output_byte_limit: None,
3233                    terminal: lower_terminal.clone(),
3234                },
3235                cx,
3236            );
3237        });
3238
3239        // Poll until the printf command produces output, rather than using a
3240        // fixed sleep which is flaky on loaded machines.
3241        let deadline = std::time::Instant::now() + Duration::from_secs(10);
3242        loop {
3243            let has_output = thread.read_with(cx, |thread, cx| {
3244                let term = thread
3245                    .terminals
3246                    .get(&terminal_id)
3247                    .expect("terminal not found");
3248                let content = term.read(cx).inner().read(cx).get_content();
3249                content.contains("output_before_kill")
3250            });
3251            if has_output {
3252                break;
3253            }
3254            assert!(
3255                std::time::Instant::now() < deadline,
3256                "Timed out waiting for printf output to appear in terminal",
3257            );
3258            cx.executor().timer(Duration::from_millis(50)).await;
3259        }
3260
3261        // Get the acp_thread Terminal and kill it
3262        let wait_for_exit = thread.update(cx, |thread, cx| {
3263            let term = thread.terminals.get(&terminal_id).unwrap();
3264            let wait_for_exit = term.read(cx).wait_for_exit();
3265            term.update(cx, |term, cx| {
3266                term.kill(cx);
3267            });
3268            wait_for_exit
3269        });
3270
3271        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3272        // Before the fix to kill_active_task, this would hang forever because
3273        // only the foreground process was killed, not the shell, so the PTY
3274        // child never exited and wait_for_completed_task never completed.
3275        let exit_result = futures::select! {
3276            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3277            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3278        };
3279
3280        assert!(
3281            exit_result.is_some(),
3282            "wait_for_exit should complete after kill, but it timed out. \
3283            This indicates kill_active_task is not properly killing the shell process."
3284        );
3285
3286        // Give the system a chance to process any pending updates
3287        cx.run_until_parked();
3288
3289        // Verify that the underlying terminal still has the output that was
3290        // written before the kill. This verifies that killing doesn't lose output.
3291        let inner_content = thread.read_with(cx, |thread, cx| {
3292            let term = thread.terminals.get(&terminal_id).unwrap();
3293            term.read(cx).inner().read(cx).get_content()
3294        });
3295
3296        assert!(
3297            inner_content.contains("output_before_kill"),
3298            "Underlying terminal should contain output from before kill, got: {}",
3299            inner_content
3300        );
3301    }
3302
3303    #[gpui::test]
3304    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3305        init_test(cx);
3306
3307        let fs = FakeFs::new(cx.executor());
3308        let project = Project::test(fs, [], cx).await;
3309        let connection = Rc::new(FakeAgentConnection::new());
3310        let thread = cx
3311            .update(|cx| {
3312                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3313            })
3314            .await
3315            .unwrap();
3316
3317        // Test creating a new user message
3318        thread.update(cx, |thread, cx| {
3319            thread.push_user_content_block(None, "Hello, ".into(), cx);
3320        });
3321
3322        thread.update(cx, |thread, cx| {
3323            assert_eq!(thread.entries.len(), 1);
3324            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3325                assert_eq!(user_msg.id, None);
3326                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3327            } else {
3328                panic!("Expected UserMessage");
3329            }
3330        });
3331
3332        // Test appending to existing user message
3333        let message_1_id = UserMessageId::new();
3334        thread.update(cx, |thread, cx| {
3335            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3336        });
3337
3338        thread.update(cx, |thread, cx| {
3339            assert_eq!(thread.entries.len(), 1);
3340            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3341                assert_eq!(user_msg.id, Some(message_1_id));
3342                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3343            } else {
3344                panic!("Expected UserMessage");
3345            }
3346        });
3347
3348        // Test creating new user message after assistant message
3349        thread.update(cx, |thread, cx| {
3350            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3351        });
3352
3353        let message_2_id = UserMessageId::new();
3354        thread.update(cx, |thread, cx| {
3355            thread.push_user_content_block(
3356                Some(message_2_id.clone()),
3357                "New user message".into(),
3358                cx,
3359            );
3360        });
3361
3362        thread.update(cx, |thread, cx| {
3363            assert_eq!(thread.entries.len(), 3);
3364            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3365                assert_eq!(user_msg.id, Some(message_2_id));
3366                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3367            } else {
3368                panic!("Expected UserMessage at index 2");
3369            }
3370        });
3371    }
3372
3373    #[gpui::test]
3374    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3375        init_test(cx);
3376
3377        let fs = FakeFs::new(cx.executor());
3378        let project = Project::test(fs, [], cx).await;
3379        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3380            |_, thread, mut cx| {
3381                async move {
3382                    thread.update(&mut cx, |thread, cx| {
3383                        thread
3384                            .handle_session_update(
3385                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3386                                    "Thinking ".into(),
3387                                )),
3388                                cx,
3389                            )
3390                            .unwrap();
3391                        thread
3392                            .handle_session_update(
3393                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3394                                    "hard!".into(),
3395                                )),
3396                                cx,
3397                            )
3398                            .unwrap();
3399                    })?;
3400                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3401                }
3402                .boxed_local()
3403            },
3404        ));
3405
3406        let thread = cx
3407            .update(|cx| {
3408                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3409            })
3410            .await
3411            .unwrap();
3412
3413        thread
3414            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3415            .await
3416            .unwrap();
3417
3418        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3419        assert_eq!(
3420            output,
3421            indoc! {r#"
3422            ## User
3423
3424            Hello from Zed!
3425
3426            ## Assistant
3427
3428            <thinking>
3429            Thinking hard!
3430            </thinking>
3431
3432            "#}
3433        );
3434    }
3435
3436    #[gpui::test]
3437    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3438        init_test(cx);
3439
3440        let fs = FakeFs::new(cx.executor());
3441        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3442            .await;
3443        let project = Project::test(fs.clone(), [], cx).await;
3444        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3445        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3446        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3447            move |_, thread, mut cx| {
3448                let read_file_tx = read_file_tx.clone();
3449                async move {
3450                    let content = thread
3451                        .update(&mut cx, |thread, cx| {
3452                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3453                        })
3454                        .unwrap()
3455                        .await
3456                        .unwrap();
3457                    assert_eq!(content, "one\ntwo\nthree\n");
3458                    read_file_tx.take().unwrap().send(()).unwrap();
3459                    thread
3460                        .update(&mut cx, |thread, cx| {
3461                            thread.write_text_file(
3462                                path!("/tmp/foo").into(),
3463                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3464                                cx,
3465                            )
3466                        })
3467                        .unwrap()
3468                        .await
3469                        .unwrap();
3470                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3471                }
3472                .boxed_local()
3473            },
3474        ));
3475
3476        let (worktree, pathbuf) = project
3477            .update(cx, |project, cx| {
3478                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3479            })
3480            .await
3481            .unwrap();
3482        let buffer = project
3483            .update(cx, |project, cx| {
3484                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3485            })
3486            .await
3487            .unwrap();
3488
3489        let thread = cx
3490            .update(|cx| {
3491                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3492            })
3493            .await
3494            .unwrap();
3495
3496        let request = thread.update(cx, |thread, cx| {
3497            thread.send_raw("Extend the count in /tmp/foo", cx)
3498        });
3499        read_file_rx.await.ok();
3500        buffer.update(cx, |buffer, cx| {
3501            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3502        });
3503        cx.run_until_parked();
3504        assert_eq!(
3505            buffer.read_with(cx, |buffer, _| buffer.text()),
3506            "zero\none\ntwo\nthree\nfour\nfive\n"
3507        );
3508        assert_eq!(
3509            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3510            "zero\none\ntwo\nthree\nfour\nfive\n"
3511        );
3512        request.await.unwrap();
3513    }
3514
3515    #[gpui::test]
3516    async fn test_reading_from_line(cx: &mut TestAppContext) {
3517        init_test(cx);
3518
3519        let fs = FakeFs::new(cx.executor());
3520        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3521            .await;
3522        let project = Project::test(fs.clone(), [], cx).await;
3523        project
3524            .update(cx, |project, cx| {
3525                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3526            })
3527            .await
3528            .unwrap();
3529
3530        let connection = Rc::new(FakeAgentConnection::new());
3531
3532        let thread = cx
3533            .update(|cx| {
3534                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3535            })
3536            .await
3537            .unwrap();
3538
3539        // Whole file
3540        let content = thread
3541            .update(cx, |thread, cx| {
3542                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3543            })
3544            .await
3545            .unwrap();
3546
3547        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3548
3549        // Only start line
3550        let content = thread
3551            .update(cx, |thread, cx| {
3552                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3553            })
3554            .await
3555            .unwrap();
3556
3557        assert_eq!(content, "three\nfour\n");
3558
3559        // Only limit
3560        let content = thread
3561            .update(cx, |thread, cx| {
3562                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3563            })
3564            .await
3565            .unwrap();
3566
3567        assert_eq!(content, "one\ntwo\n");
3568
3569        // Range
3570        let content = thread
3571            .update(cx, |thread, cx| {
3572                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3573            })
3574            .await
3575            .unwrap();
3576
3577        assert_eq!(content, "two\nthree\n");
3578
3579        // Invalid
3580        let err = thread
3581            .update(cx, |thread, cx| {
3582                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3583            })
3584            .await
3585            .unwrap_err();
3586
3587        assert_eq!(
3588            err.to_string(),
3589            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3590        );
3591    }
3592
3593    #[gpui::test]
3594    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3595        init_test(cx);
3596
3597        let fs = FakeFs::new(cx.executor());
3598        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3599        let project = Project::test(fs.clone(), [], cx).await;
3600        project
3601            .update(cx, |project, cx| {
3602                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3603            })
3604            .await
3605            .unwrap();
3606
3607        let connection = Rc::new(FakeAgentConnection::new());
3608
3609        let thread = cx
3610            .update(|cx| {
3611                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3612            })
3613            .await
3614            .unwrap();
3615
3616        // Whole file
3617        let content = thread
3618            .update(cx, |thread, cx| {
3619                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3620            })
3621            .await
3622            .unwrap();
3623
3624        assert_eq!(content, "");
3625
3626        // Only start line
3627        let content = thread
3628            .update(cx, |thread, cx| {
3629                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3630            })
3631            .await
3632            .unwrap();
3633
3634        assert_eq!(content, "");
3635
3636        // Only limit
3637        let content = thread
3638            .update(cx, |thread, cx| {
3639                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3640            })
3641            .await
3642            .unwrap();
3643
3644        assert_eq!(content, "");
3645
3646        // Range
3647        let content = thread
3648            .update(cx, |thread, cx| {
3649                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3650            })
3651            .await
3652            .unwrap();
3653
3654        assert_eq!(content, "");
3655
3656        // Invalid
3657        let err = thread
3658            .update(cx, |thread, cx| {
3659                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3660            })
3661            .await
3662            .unwrap_err();
3663
3664        assert_eq!(
3665            err.to_string(),
3666            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3667        );
3668    }
3669    #[gpui::test]
3670    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3671        init_test(cx);
3672
3673        let fs = FakeFs::new(cx.executor());
3674        fs.insert_tree(path!("/tmp"), json!({})).await;
3675        let project = Project::test(fs.clone(), [], cx).await;
3676        project
3677            .update(cx, |project, cx| {
3678                project.find_or_create_worktree(path!("/tmp"), true, cx)
3679            })
3680            .await
3681            .unwrap();
3682
3683        let connection = Rc::new(FakeAgentConnection::new());
3684
3685        let thread = cx
3686            .update(|cx| {
3687                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3688            })
3689            .await
3690            .unwrap();
3691
3692        // Out of project file
3693        let err = thread
3694            .update(cx, |thread, cx| {
3695                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3696            })
3697            .await
3698            .unwrap_err();
3699
3700        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3701    }
3702
3703    #[gpui::test]
3704    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3705        init_test(cx);
3706
3707        let fs = FakeFs::new(cx.executor());
3708        let project = Project::test(fs, [], cx).await;
3709        let id = acp::ToolCallId::new("test");
3710
3711        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3712            let id = id.clone();
3713            move |_, thread, mut cx| {
3714                let id = id.clone();
3715                async move {
3716                    thread
3717                        .update(&mut cx, |thread, cx| {
3718                            thread.handle_session_update(
3719                                acp::SessionUpdate::ToolCall(
3720                                    acp::ToolCall::new(id.clone(), "Label")
3721                                        .kind(acp::ToolKind::Fetch)
3722                                        .status(acp::ToolCallStatus::InProgress),
3723                                ),
3724                                cx,
3725                            )
3726                        })
3727                        .unwrap()
3728                        .unwrap();
3729                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3730                }
3731                .boxed_local()
3732            }
3733        }));
3734
3735        let thread = cx
3736            .update(|cx| {
3737                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3738            })
3739            .await
3740            .unwrap();
3741
3742        let request = thread.update(cx, |thread, cx| {
3743            thread.send_raw("Fetch https://example.com", cx)
3744        });
3745
3746        run_until_first_tool_call(&thread, cx).await;
3747
3748        thread.read_with(cx, |thread, _| {
3749            assert!(matches!(
3750                thread.entries[1],
3751                AgentThreadEntry::ToolCall(ToolCall {
3752                    status: ToolCallStatus::InProgress,
3753                    ..
3754                })
3755            ));
3756        });
3757
3758        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3759
3760        thread.read_with(cx, |thread, _| {
3761            assert!(matches!(
3762                &thread.entries[1],
3763                AgentThreadEntry::ToolCall(ToolCall {
3764                    status: ToolCallStatus::Canceled,
3765                    ..
3766                })
3767            ));
3768        });
3769
3770        thread
3771            .update(cx, |thread, cx| {
3772                thread.handle_session_update(
3773                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3774                        id,
3775                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3776                    )),
3777                    cx,
3778                )
3779            })
3780            .unwrap();
3781
3782        request.await.unwrap();
3783
3784        thread.read_with(cx, |thread, _| {
3785            assert!(matches!(
3786                thread.entries[1],
3787                AgentThreadEntry::ToolCall(ToolCall {
3788                    status: ToolCallStatus::Completed,
3789                    ..
3790                })
3791            ));
3792        });
3793    }
3794
3795    #[gpui::test]
3796    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3797        init_test(cx);
3798        let fs = FakeFs::new(cx.background_executor.clone());
3799        fs.insert_tree(path!("/test"), json!({})).await;
3800        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3801
3802        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3803            move |_, thread, mut cx| {
3804                async move {
3805                    thread
3806                        .update(&mut cx, |thread, cx| {
3807                            thread.handle_session_update(
3808                                acp::SessionUpdate::ToolCall(
3809                                    acp::ToolCall::new("test", "Label")
3810                                        .kind(acp::ToolKind::Edit)
3811                                        .status(acp::ToolCallStatus::Completed)
3812                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3813                                            "/test/test.txt",
3814                                            "foo",
3815                                        ))]),
3816                                ),
3817                                cx,
3818                            )
3819                        })
3820                        .unwrap()
3821                        .unwrap();
3822                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3823                }
3824                .boxed_local()
3825            }
3826        }));
3827
3828        let thread = cx
3829            .update(|cx| {
3830                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3831            })
3832            .await
3833            .unwrap();
3834
3835        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3836            .await
3837            .unwrap();
3838
3839        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3840    }
3841
3842    #[gpui::test(iterations = 10)]
3843    async fn test_checkpoints(cx: &mut TestAppContext) {
3844        init_test(cx);
3845        let fs = FakeFs::new(cx.background_executor.clone());
3846        fs.insert_tree(
3847            path!("/test"),
3848            json!({
3849                ".git": {}
3850            }),
3851        )
3852        .await;
3853        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3854
3855        let simulate_changes = Arc::new(AtomicBool::new(true));
3856        let next_filename = Arc::new(AtomicUsize::new(0));
3857        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3858            let simulate_changes = simulate_changes.clone();
3859            let next_filename = next_filename.clone();
3860            let fs = fs.clone();
3861            move |request, thread, mut cx| {
3862                let fs = fs.clone();
3863                let simulate_changes = simulate_changes.clone();
3864                let next_filename = next_filename.clone();
3865                async move {
3866                    if simulate_changes.load(SeqCst) {
3867                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3868                        fs.write(Path::new(&filename), b"").await?;
3869                    }
3870
3871                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3872                        panic!("expected text content block");
3873                    };
3874                    thread.update(&mut cx, |thread, cx| {
3875                        thread
3876                            .handle_session_update(
3877                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3878                                    content.text.to_uppercase().into(),
3879                                )),
3880                                cx,
3881                            )
3882                            .unwrap();
3883                    })?;
3884                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3885                }
3886                .boxed_local()
3887            }
3888        }));
3889        let thread = cx
3890            .update(|cx| {
3891                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3892            })
3893            .await
3894            .unwrap();
3895
3896        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3897            .await
3898            .unwrap();
3899        thread.read_with(cx, |thread, cx| {
3900            assert_eq!(
3901                thread.to_markdown(cx),
3902                indoc! {"
3903                    ## User (checkpoint)
3904
3905                    Lorem
3906
3907                    ## Assistant
3908
3909                    LOREM
3910
3911                "}
3912            );
3913        });
3914        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3915
3916        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3917            .await
3918            .unwrap();
3919        thread.read_with(cx, |thread, cx| {
3920            assert_eq!(
3921                thread.to_markdown(cx),
3922                indoc! {"
3923                    ## User (checkpoint)
3924
3925                    Lorem
3926
3927                    ## Assistant
3928
3929                    LOREM
3930
3931                    ## User (checkpoint)
3932
3933                    ipsum
3934
3935                    ## Assistant
3936
3937                    IPSUM
3938
3939                "}
3940            );
3941        });
3942        assert_eq!(
3943            fs.files(),
3944            vec![
3945                Path::new(path!("/test/file-0")),
3946                Path::new(path!("/test/file-1"))
3947            ]
3948        );
3949
3950        // Checkpoint isn't stored when there are no changes.
3951        simulate_changes.store(false, SeqCst);
3952        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3953            .await
3954            .unwrap();
3955        thread.read_with(cx, |thread, cx| {
3956            assert_eq!(
3957                thread.to_markdown(cx),
3958                indoc! {"
3959                    ## User (checkpoint)
3960
3961                    Lorem
3962
3963                    ## Assistant
3964
3965                    LOREM
3966
3967                    ## User (checkpoint)
3968
3969                    ipsum
3970
3971                    ## Assistant
3972
3973                    IPSUM
3974
3975                    ## User
3976
3977                    dolor
3978
3979                    ## Assistant
3980
3981                    DOLOR
3982
3983                "}
3984            );
3985        });
3986        assert_eq!(
3987            fs.files(),
3988            vec![
3989                Path::new(path!("/test/file-0")),
3990                Path::new(path!("/test/file-1"))
3991            ]
3992        );
3993
3994        // Rewinding the conversation truncates the history and restores the checkpoint.
3995        thread
3996            .update(cx, |thread, cx| {
3997                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3998                    panic!("unexpected entries {:?}", thread.entries)
3999                };
4000                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
4001            })
4002            .await
4003            .unwrap();
4004        thread.read_with(cx, |thread, cx| {
4005            assert_eq!(
4006                thread.to_markdown(cx),
4007                indoc! {"
4008                    ## User (checkpoint)
4009
4010                    Lorem
4011
4012                    ## Assistant
4013
4014                    LOREM
4015
4016                "}
4017            );
4018        });
4019        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4020    }
4021
4022    #[gpui::test]
4023    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4024        use std::sync::atomic::AtomicUsize;
4025        init_test(cx);
4026
4027        let fs = FakeFs::new(cx.executor());
4028        let project = Project::test(fs, None, cx).await;
4029
4030        // Create a connection that simulates refusal after tool result
4031        let prompt_count = Arc::new(AtomicUsize::new(0));
4032        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4033            let prompt_count = prompt_count.clone();
4034            move |_request, thread, mut cx| {
4035                let count = prompt_count.fetch_add(1, SeqCst);
4036                async move {
4037                    if count == 0 {
4038                        // First prompt: Generate a tool call with result
4039                        thread.update(&mut cx, |thread, cx| {
4040                            thread
4041                                .handle_session_update(
4042                                    acp::SessionUpdate::ToolCall(
4043                                        acp::ToolCall::new("tool1", "Test Tool")
4044                                            .kind(acp::ToolKind::Fetch)
4045                                            .status(acp::ToolCallStatus::Completed)
4046                                            .raw_input(serde_json::json!({"query": "test"}))
4047                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
4048                                    ),
4049                                    cx,
4050                                )
4051                                .unwrap();
4052                        })?;
4053
4054                        // Now return refusal because of the tool result
4055                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4056                    } else {
4057                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4058                    }
4059                }
4060                .boxed_local()
4061            }
4062        }));
4063
4064        let thread = cx
4065            .update(|cx| {
4066                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4067            })
4068            .await
4069            .unwrap();
4070
4071        // Track if we see a Refusal event
4072        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4073        let saw_refusal_event_captured = saw_refusal_event.clone();
4074        thread.update(cx, |_thread, cx| {
4075            cx.subscribe(
4076                &thread,
4077                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4078                    if matches!(event, AcpThreadEvent::Refusal) {
4079                        *saw_refusal_event_captured.lock().unwrap() = true;
4080                    }
4081                },
4082            )
4083            .detach();
4084        });
4085
4086        // Send a user message - this will trigger tool call and then refusal
4087        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4088        cx.background_executor.spawn(send_task).detach();
4089        cx.run_until_parked();
4090
4091        // Verify that:
4092        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4093        // 2. The user message was NOT truncated
4094        assert!(
4095            *saw_refusal_event.lock().unwrap(),
4096            "Refusal event should be emitted for tool result refusals"
4097        );
4098
4099        thread.read_with(cx, |thread, _| {
4100            let entries = thread.entries();
4101            assert!(entries.len() >= 2, "Should have user message and tool call");
4102
4103            // Verify user message is still there
4104            assert!(
4105                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4106                "User message should not be truncated"
4107            );
4108
4109            // Verify tool call is there with result
4110            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4111                assert!(
4112                    tool_call.raw_output.is_some(),
4113                    "Tool call should have output"
4114                );
4115            } else {
4116                panic!("Expected tool call at index 1");
4117            }
4118        });
4119    }
4120
4121    #[gpui::test]
4122    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4123        init_test(cx);
4124
4125        let fs = FakeFs::new(cx.executor());
4126        let project = Project::test(fs, None, cx).await;
4127
4128        let refuse_next = Arc::new(AtomicBool::new(false));
4129        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4130            let refuse_next = refuse_next.clone();
4131            move |_request, _thread, _cx| {
4132                if refuse_next.load(SeqCst) {
4133                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4134                        .boxed_local()
4135                } else {
4136                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4137                        .boxed_local()
4138                }
4139            }
4140        }));
4141
4142        let thread = cx
4143            .update(|cx| {
4144                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4145            })
4146            .await
4147            .unwrap();
4148
4149        // Track if we see a Refusal event
4150        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4151        let saw_refusal_event_captured = saw_refusal_event.clone();
4152        thread.update(cx, |_thread, cx| {
4153            cx.subscribe(
4154                &thread,
4155                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4156                    if matches!(event, AcpThreadEvent::Refusal) {
4157                        *saw_refusal_event_captured.lock().unwrap() = true;
4158                    }
4159                },
4160            )
4161            .detach();
4162        });
4163
4164        // Send a message that will be refused
4165        refuse_next.store(true, SeqCst);
4166        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4167            .await
4168            .unwrap();
4169
4170        // Verify that a Refusal event WAS emitted for user prompt refusal
4171        assert!(
4172            *saw_refusal_event.lock().unwrap(),
4173            "Refusal event should be emitted for user prompt refusals"
4174        );
4175
4176        // Verify the message was truncated (user prompt refusal)
4177        thread.read_with(cx, |thread, cx| {
4178            assert_eq!(thread.to_markdown(cx), "");
4179        });
4180    }
4181
4182    #[gpui::test]
4183    async fn test_refusal(cx: &mut TestAppContext) {
4184        init_test(cx);
4185        let fs = FakeFs::new(cx.background_executor.clone());
4186        fs.insert_tree(path!("/"), json!({})).await;
4187        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4188
4189        let refuse_next = Arc::new(AtomicBool::new(false));
4190        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4191            let refuse_next = refuse_next.clone();
4192            move |request, thread, mut cx| {
4193                let refuse_next = refuse_next.clone();
4194                async move {
4195                    if refuse_next.load(SeqCst) {
4196                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4197                    }
4198
4199                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4200                        panic!("expected text content block");
4201                    };
4202                    thread.update(&mut cx, |thread, cx| {
4203                        thread
4204                            .handle_session_update(
4205                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4206                                    content.text.to_uppercase().into(),
4207                                )),
4208                                cx,
4209                            )
4210                            .unwrap();
4211                    })?;
4212                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4213                }
4214                .boxed_local()
4215            }
4216        }));
4217        let thread = cx
4218            .update(|cx| {
4219                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4220            })
4221            .await
4222            .unwrap();
4223
4224        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4225            .await
4226            .unwrap();
4227        thread.read_with(cx, |thread, cx| {
4228            assert_eq!(
4229                thread.to_markdown(cx),
4230                indoc! {"
4231                    ## User
4232
4233                    hello
4234
4235                    ## Assistant
4236
4237                    HELLO
4238
4239                "}
4240            );
4241        });
4242
4243        // Simulate refusing the second message. The message should be truncated
4244        // when a user prompt is refused.
4245        refuse_next.store(true, SeqCst);
4246        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4247            .await
4248            .unwrap();
4249        thread.read_with(cx, |thread, cx| {
4250            assert_eq!(
4251                thread.to_markdown(cx),
4252                indoc! {"
4253                    ## User
4254
4255                    hello
4256
4257                    ## Assistant
4258
4259                    HELLO
4260
4261                "}
4262            );
4263        });
4264    }
4265
4266    async fn run_until_first_tool_call(
4267        thread: &Entity<AcpThread>,
4268        cx: &mut TestAppContext,
4269    ) -> usize {
4270        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4271
4272        let subscription = cx.update(|cx| {
4273            cx.subscribe(thread, move |thread, _, cx| {
4274                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4275                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4276                        return tx.try_send(ix).unwrap();
4277                    }
4278                }
4279            })
4280        });
4281
4282        select! {
4283            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4284                panic!("Timeout waiting for tool call")
4285            }
4286            ix = rx.next().fuse() => {
4287                drop(subscription);
4288                ix.unwrap()
4289            }
4290        }
4291    }
4292
4293    #[derive(Clone, Default)]
4294    struct FakeAgentConnection {
4295        auth_methods: Vec<acp::AuthMethod>,
4296        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4297        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4298        on_user_message: Option<
4299            Rc<
4300                dyn Fn(
4301                        acp::PromptRequest,
4302                        WeakEntity<AcpThread>,
4303                        AsyncApp,
4304                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4305                    + 'static,
4306            >,
4307        >,
4308    }
4309
4310    impl FakeAgentConnection {
4311        fn new() -> Self {
4312            Self {
4313                auth_methods: Vec::new(),
4314                on_user_message: None,
4315                sessions: Arc::default(),
4316                set_title_calls: Default::default(),
4317            }
4318        }
4319
4320        #[expect(unused)]
4321        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4322            self.auth_methods = auth_methods;
4323            self
4324        }
4325
4326        fn on_user_message(
4327            mut self,
4328            handler: impl Fn(
4329                acp::PromptRequest,
4330                WeakEntity<AcpThread>,
4331                AsyncApp,
4332            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4333            + 'static,
4334        ) -> Self {
4335            self.on_user_message.replace(Rc::new(handler));
4336            self
4337        }
4338    }
4339
4340    impl AgentConnection for FakeAgentConnection {
4341        fn agent_id(&self) -> AgentId {
4342            AgentId::new("fake")
4343        }
4344
4345        fn telemetry_id(&self) -> SharedString {
4346            "fake".into()
4347        }
4348
4349        fn auth_methods(&self) -> &[acp::AuthMethod] {
4350            &self.auth_methods
4351        }
4352
4353        fn new_session(
4354            self: Rc<Self>,
4355            project: Entity<Project>,
4356            work_dirs: PathList,
4357            cx: &mut App,
4358        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4359            let session_id = acp::SessionId::new(
4360                rand::rng()
4361                    .sample_iter(&distr::Alphanumeric)
4362                    .take(7)
4363                    .map(char::from)
4364                    .collect::<String>(),
4365            );
4366            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4367            let thread = cx.new(|cx| {
4368                AcpThread::new(
4369                    None,
4370                    None,
4371                    Some(work_dirs),
4372                    self.clone(),
4373                    project,
4374                    action_log,
4375                    session_id.clone(),
4376                    watch::Receiver::constant(
4377                        acp::PromptCapabilities::new()
4378                            .image(true)
4379                            .audio(true)
4380                            .embedded_context(true),
4381                    ),
4382                    cx,
4383                )
4384            });
4385            self.sessions.lock().insert(session_id, thread.downgrade());
4386            Task::ready(Ok(thread))
4387        }
4388
4389        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4390            if self.auth_methods().iter().any(|m| m.id() == &method) {
4391                Task::ready(Ok(()))
4392            } else {
4393                Task::ready(Err(anyhow!("Invalid Auth Method")))
4394            }
4395        }
4396
4397        fn prompt(
4398            &self,
4399            _id: Option<UserMessageId>,
4400            params: acp::PromptRequest,
4401            cx: &mut App,
4402        ) -> Task<gpui::Result<acp::PromptResponse>> {
4403            let sessions = self.sessions.lock();
4404            let thread = sessions.get(&params.session_id).unwrap();
4405            if let Some(handler) = &self.on_user_message {
4406                let handler = handler.clone();
4407                let thread = thread.clone();
4408                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4409            } else {
4410                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4411            }
4412        }
4413
4414        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4415
4416        fn truncate(
4417            &self,
4418            session_id: &acp::SessionId,
4419            _cx: &App,
4420        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4421            Some(Rc::new(FakeAgentSessionEditor {
4422                _session_id: session_id.clone(),
4423            }))
4424        }
4425
4426        fn set_title(
4427            &self,
4428            _session_id: &acp::SessionId,
4429            _cx: &App,
4430        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4431            Some(Rc::new(FakeAgentSessionSetTitle {
4432                calls: self.set_title_calls.clone(),
4433            }))
4434        }
4435
4436        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4437            self
4438        }
4439    }
4440
4441    struct FakeAgentSessionSetTitle {
4442        calls: Rc<RefCell<Vec<SharedString>>>,
4443    }
4444
4445    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4446        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4447            self.calls.borrow_mut().push(title);
4448            Task::ready(Ok(()))
4449        }
4450    }
4451
4452    struct FakeAgentSessionEditor {
4453        _session_id: acp::SessionId,
4454    }
4455
4456    impl AgentSessionTruncate for FakeAgentSessionEditor {
4457        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4458            Task::ready(Ok(()))
4459        }
4460    }
4461
4462    #[gpui::test]
4463    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4464        init_test(cx);
4465
4466        let fs = FakeFs::new(cx.executor());
4467        let project = Project::test(fs, [], cx).await;
4468        let connection = Rc::new(FakeAgentConnection::new());
4469        let thread = cx
4470            .update(|cx| {
4471                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4472            })
4473            .await
4474            .unwrap();
4475
4476        // Try to update a tool call that doesn't exist
4477        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4478        thread.update(cx, |thread, cx| {
4479            let result = thread.handle_session_update(
4480                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4481                    nonexistent_id.clone(),
4482                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4483                )),
4484                cx,
4485            );
4486
4487            // The update should succeed (not return an error)
4488            assert!(result.is_ok());
4489
4490            // There should now be exactly one entry in the thread
4491            assert_eq!(thread.entries.len(), 1);
4492
4493            // The entry should be a failed tool call
4494            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4495                assert_eq!(tool_call.id, nonexistent_id);
4496                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4497                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4498
4499                // Check that the content contains the error message
4500                assert_eq!(tool_call.content.len(), 1);
4501                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4502                    match content_block {
4503                        ContentBlock::Markdown { markdown } => {
4504                            let markdown_text = markdown.read(cx).source();
4505                            assert!(markdown_text.contains("Tool call not found"));
4506                        }
4507                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4508                        ContentBlock::ResourceLink { .. } => {
4509                            panic!("Expected markdown content, got resource link")
4510                        }
4511                        ContentBlock::Image { .. } => {
4512                            panic!("Expected markdown content, got image")
4513                        }
4514                    }
4515                } else {
4516                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4517                }
4518            } else {
4519                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4520            }
4521        });
4522    }
4523
4524    /// Tests that restoring a checkpoint properly cleans up terminals that were
4525    /// created after that checkpoint, and cancels any in-progress generation.
4526    ///
4527    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4528    /// that were started after that checkpoint should be terminated, and any in-progress
4529    /// AI generation should be canceled.
4530    #[gpui::test]
4531    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4532        init_test(cx);
4533
4534        let fs = FakeFs::new(cx.executor());
4535        let project = Project::test(fs, [], cx).await;
4536        let connection = Rc::new(FakeAgentConnection::new());
4537        let thread = cx
4538            .update(|cx| {
4539                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4540            })
4541            .await
4542            .unwrap();
4543
4544        // Send first user message to create a checkpoint
4545        cx.update(|cx| {
4546            thread.update(cx, |thread, cx| {
4547                thread.send(vec!["first message".into()], cx)
4548            })
4549        })
4550        .await
4551        .unwrap();
4552
4553        // Send second message (creates another checkpoint) - we'll restore to this one
4554        cx.update(|cx| {
4555            thread.update(cx, |thread, cx| {
4556                thread.send(vec!["second message".into()], cx)
4557            })
4558        })
4559        .await
4560        .unwrap();
4561
4562        // Create 2 terminals BEFORE the checkpoint that have completed running
4563        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4564        let mock_terminal_1 = cx.new(|cx| {
4565            let builder = ::terminal::TerminalBuilder::new_display_only(
4566                ::terminal::terminal_settings::CursorShape::default(),
4567                ::terminal::terminal_settings::AlternateScroll::On,
4568                None,
4569                0,
4570                cx.background_executor(),
4571                PathStyle::local(),
4572            )
4573            .unwrap();
4574            builder.subscribe(cx)
4575        });
4576
4577        thread.update(cx, |thread, cx| {
4578            thread.on_terminal_provider_event(
4579                TerminalProviderEvent::Created {
4580                    terminal_id: terminal_id_1.clone(),
4581                    label: "echo 'first'".to_string(),
4582                    cwd: Some(PathBuf::from("/test")),
4583                    output_byte_limit: None,
4584                    terminal: mock_terminal_1.clone(),
4585                },
4586                cx,
4587            );
4588        });
4589
4590        thread.update(cx, |thread, cx| {
4591            thread.on_terminal_provider_event(
4592                TerminalProviderEvent::Output {
4593                    terminal_id: terminal_id_1.clone(),
4594                    data: b"first\n".to_vec(),
4595                },
4596                cx,
4597            );
4598        });
4599
4600        thread.update(cx, |thread, cx| {
4601            thread.on_terminal_provider_event(
4602                TerminalProviderEvent::Exit {
4603                    terminal_id: terminal_id_1.clone(),
4604                    status: acp::TerminalExitStatus::new().exit_code(0),
4605                },
4606                cx,
4607            );
4608        });
4609
4610        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4611        let mock_terminal_2 = cx.new(|cx| {
4612            let builder = ::terminal::TerminalBuilder::new_display_only(
4613                ::terminal::terminal_settings::CursorShape::default(),
4614                ::terminal::terminal_settings::AlternateScroll::On,
4615                None,
4616                0,
4617                cx.background_executor(),
4618                PathStyle::local(),
4619            )
4620            .unwrap();
4621            builder.subscribe(cx)
4622        });
4623
4624        thread.update(cx, |thread, cx| {
4625            thread.on_terminal_provider_event(
4626                TerminalProviderEvent::Created {
4627                    terminal_id: terminal_id_2.clone(),
4628                    label: "echo 'second'".to_string(),
4629                    cwd: Some(PathBuf::from("/test")),
4630                    output_byte_limit: None,
4631                    terminal: mock_terminal_2.clone(),
4632                },
4633                cx,
4634            );
4635        });
4636
4637        thread.update(cx, |thread, cx| {
4638            thread.on_terminal_provider_event(
4639                TerminalProviderEvent::Output {
4640                    terminal_id: terminal_id_2.clone(),
4641                    data: b"second\n".to_vec(),
4642                },
4643                cx,
4644            );
4645        });
4646
4647        thread.update(cx, |thread, cx| {
4648            thread.on_terminal_provider_event(
4649                TerminalProviderEvent::Exit {
4650                    terminal_id: terminal_id_2.clone(),
4651                    status: acp::TerminalExitStatus::new().exit_code(0),
4652                },
4653                cx,
4654            );
4655        });
4656
4657        // Get the second message ID to restore to
4658        let second_message_id = thread.read_with(cx, |thread, _| {
4659            // At this point we have:
4660            // - Index 0: First user message (with checkpoint)
4661            // - Index 1: Second user message (with checkpoint)
4662            // No assistant responses because FakeAgentConnection just returns EndTurn
4663            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4664                panic!("expected user message at index 1");
4665            };
4666            message.id.clone().unwrap()
4667        });
4668
4669        // Create a terminal AFTER the checkpoint we'll restore to.
4670        // This simulates the AI agent starting a long-running terminal command.
4671        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4672        let mock_terminal = cx.new(|cx| {
4673            let builder = ::terminal::TerminalBuilder::new_display_only(
4674                ::terminal::terminal_settings::CursorShape::default(),
4675                ::terminal::terminal_settings::AlternateScroll::On,
4676                None,
4677                0,
4678                cx.background_executor(),
4679                PathStyle::local(),
4680            )
4681            .unwrap();
4682            builder.subscribe(cx)
4683        });
4684
4685        // Register the terminal as created
4686        thread.update(cx, |thread, cx| {
4687            thread.on_terminal_provider_event(
4688                TerminalProviderEvent::Created {
4689                    terminal_id: terminal_id.clone(),
4690                    label: "sleep 1000".to_string(),
4691                    cwd: Some(PathBuf::from("/test")),
4692                    output_byte_limit: None,
4693                    terminal: mock_terminal.clone(),
4694                },
4695                cx,
4696            );
4697        });
4698
4699        // Simulate the terminal producing output (still running)
4700        thread.update(cx, |thread, cx| {
4701            thread.on_terminal_provider_event(
4702                TerminalProviderEvent::Output {
4703                    terminal_id: terminal_id.clone(),
4704                    data: b"terminal is running...\n".to_vec(),
4705                },
4706                cx,
4707            );
4708        });
4709
4710        // Create a tool call entry that references this terminal
4711        // This represents the agent requesting a terminal command
4712        thread.update(cx, |thread, cx| {
4713            thread
4714                .handle_session_update(
4715                    acp::SessionUpdate::ToolCall(
4716                        acp::ToolCall::new("terminal-tool-1", "Running command")
4717                            .kind(acp::ToolKind::Execute)
4718                            .status(acp::ToolCallStatus::InProgress)
4719                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4720                                terminal_id.clone(),
4721                            ))])
4722                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4723                    ),
4724                    cx,
4725                )
4726                .unwrap();
4727        });
4728
4729        // Verify terminal exists and is in the thread
4730        let terminal_exists_before =
4731            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4732        assert!(
4733            terminal_exists_before,
4734            "Terminal should exist before checkpoint restore"
4735        );
4736
4737        // Verify the terminal's underlying task is still running (not completed)
4738        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4739            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4740            terminal_entity.read_with(cx, |term, _cx| {
4741                term.output().is_none() // output is None means it's still running
4742            })
4743        });
4744        assert!(
4745            terminal_running_before,
4746            "Terminal should be running before checkpoint restore"
4747        );
4748
4749        // Verify we have the expected entries before restore
4750        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4751        assert!(
4752            entry_count_before > 1,
4753            "Should have multiple entries before restore"
4754        );
4755
4756        // Restore the checkpoint to the second message.
4757        // This should:
4758        // 1. Cancel any in-progress generation (via the cancel() call)
4759        // 2. Remove the terminal that was created after that point
4760        thread
4761            .update(cx, |thread, cx| {
4762                thread.restore_checkpoint(second_message_id, cx)
4763            })
4764            .await
4765            .unwrap();
4766
4767        // Verify that no send_task is in progress after restore
4768        // (cancel() clears the send_task)
4769        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4770        assert!(
4771            !has_send_task_after,
4772            "Should not have a send_task after restore (cancel should have cleared it)"
4773        );
4774
4775        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4776        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4777        assert_eq!(
4778            entry_count, 1,
4779            "Should have 1 entry after restore (only the first user message)"
4780        );
4781
4782        // Verify the 2 completed terminals from before the checkpoint still exist
4783        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4784            thread.terminals.contains_key(&terminal_id_1)
4785        });
4786        assert!(
4787            terminal_1_exists,
4788            "Terminal 1 (from before checkpoint) should still exist"
4789        );
4790
4791        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4792            thread.terminals.contains_key(&terminal_id_2)
4793        });
4794        assert!(
4795            terminal_2_exists,
4796            "Terminal 2 (from before checkpoint) should still exist"
4797        );
4798
4799        // Verify they're still in completed state
4800        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4801            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4802            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4803        });
4804        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4805
4806        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4807            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4808            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4809        });
4810        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4811
4812        // Verify the running terminal (created after checkpoint) was removed
4813        let terminal_3_exists =
4814            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4815        assert!(
4816            !terminal_3_exists,
4817            "Terminal 3 (created after checkpoint) should have been removed"
4818        );
4819
4820        // Verify total count is 2 (the two from before the checkpoint)
4821        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4822        assert_eq!(
4823            terminal_count, 2,
4824            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4825        );
4826    }
4827
4828    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4829    /// even when a new user message is added while the async checkpoint comparison is in progress.
4830    ///
4831    /// This is a regression test for a bug where update_last_checkpoint would fail with
4832    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4833    /// update_last_checkpoint started and when its async closure ran.
4834    #[gpui::test]
4835    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4836        init_test(cx);
4837
4838        let fs = FakeFs::new(cx.executor());
4839        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4840            .await;
4841        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4842
4843        let handler_done = Arc::new(AtomicBool::new(false));
4844        let handler_done_clone = handler_done.clone();
4845        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4846            move |_, _thread, _cx| {
4847                handler_done_clone.store(true, SeqCst);
4848                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4849            },
4850        ));
4851
4852        let thread = cx
4853            .update(|cx| {
4854                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4855            })
4856            .await
4857            .unwrap();
4858
4859        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4860        let send_task = cx.background_executor.spawn(send_future);
4861
4862        // Tick until handler completes, then a few more to let update_last_checkpoint start
4863        while !handler_done.load(SeqCst) {
4864            cx.executor().tick();
4865        }
4866        for _ in 0..5 {
4867            cx.executor().tick();
4868        }
4869
4870        thread.update(cx, |thread, cx| {
4871            thread.push_entry(
4872                AgentThreadEntry::UserMessage(UserMessage {
4873                    id: Some(UserMessageId::new()),
4874                    content: ContentBlock::Empty,
4875                    chunks: vec!["Injected message (no checkpoint)".into()],
4876                    checkpoint: None,
4877                    indented: false,
4878                }),
4879                cx,
4880            );
4881        });
4882
4883        cx.run_until_parked();
4884        let result = send_task.await;
4885
4886        assert!(
4887            result.is_ok(),
4888            "send should succeed even when new message added during update_last_checkpoint: {:?}",
4889            result.err()
4890        );
4891    }
4892
4893    /// Tests that when a follow-up message is sent during generation,
4894    /// the first turn completing does NOT clear `running_turn` because
4895    /// it now belongs to the second turn.
4896    #[gpui::test]
4897    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4898        init_test(cx);
4899
4900        let fs = FakeFs::new(cx.executor());
4901        let project = Project::test(fs, [], cx).await;
4902
4903        // First handler waits for this signal before completing
4904        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4905        let first_complete_rx = RefCell::new(Some(first_complete_rx));
4906
4907        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4908            move |params, _thread, _cx| {
4909                let first_complete_rx = first_complete_rx.borrow_mut().take();
4910                let is_first = params
4911                    .prompt
4912                    .iter()
4913                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4914
4915                async move {
4916                    if is_first {
4917                        // First handler waits until signaled
4918                        if let Some(rx) = first_complete_rx {
4919                            rx.await.ok();
4920                        }
4921                    }
4922                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4923                }
4924                .boxed_local()
4925            }
4926        }));
4927
4928        let thread = cx
4929            .update(|cx| {
4930                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4931            })
4932            .await
4933            .unwrap();
4934
4935        // Send first message (turn_id=1) - handler will block
4936        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4937        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4938
4939        // Send second message (turn_id=2) while first is still blocked
4940        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4941        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4942        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4943
4944        let running_turn_after_second_send =
4945            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4946        assert_eq!(
4947            running_turn_after_second_send,
4948            Some(2),
4949            "running_turn should be set to turn 2 after sending second message"
4950        );
4951
4952        // Now signal first handler to complete
4953        first_complete_tx.send(()).ok();
4954
4955        // First request completes - should NOT clear running_turn
4956        // because running_turn now belongs to turn 2
4957        first_request.await.unwrap();
4958
4959        let running_turn_after_first =
4960            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4961        assert_eq!(
4962            running_turn_after_first,
4963            Some(2),
4964            "first turn completing should not clear running_turn (belongs to turn 2)"
4965        );
4966
4967        // Second request completes - SHOULD clear running_turn
4968        second_request.await.unwrap();
4969
4970        let running_turn_after_second =
4971            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4972        assert!(
4973            !running_turn_after_second,
4974            "second turn completing should clear running_turn"
4975        );
4976    }
4977
4978    #[gpui::test]
4979    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4980        cx: &mut TestAppContext,
4981    ) {
4982        init_test(cx);
4983
4984        let fs = FakeFs::new(cx.executor());
4985        let project = Project::test(fs, [], cx).await;
4986
4987        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4988            move |_params, thread, mut cx| {
4989                async move {
4990                    thread
4991                        .update(&mut cx, |thread, cx| {
4992                            thread.handle_session_update(
4993                                acp::SessionUpdate::ToolCall(
4994                                    acp::ToolCall::new(
4995                                        acp::ToolCallId::new("test-tool"),
4996                                        "Test Tool",
4997                                    )
4998                                    .kind(acp::ToolKind::Fetch)
4999                                    .status(acp::ToolCallStatus::InProgress),
5000                                ),
5001                                cx,
5002                            )
5003                        })
5004                        .unwrap()
5005                        .unwrap();
5006
5007                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5008                }
5009                .boxed_local()
5010            },
5011        ));
5012
5013        let thread = cx
5014            .update(|cx| {
5015                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5016            })
5017            .await
5018            .unwrap();
5019
5020        let response = thread
5021            .update(cx, |thread, cx| thread.send_raw("test message", cx))
5022            .await;
5023
5024        let response = response
5025            .expect("send should succeed")
5026            .expect("should have response");
5027        assert_eq!(
5028            response.stop_reason,
5029            acp::StopReason::Cancelled,
5030            "response should have Cancelled stop_reason"
5031        );
5032
5033        thread.read_with(cx, |thread, _| {
5034            let tool_entry = thread
5035                .entries
5036                .iter()
5037                .find_map(|e| {
5038                    if let AgentThreadEntry::ToolCall(call) = e {
5039                        Some(call)
5040                    } else {
5041                        None
5042                    }
5043                })
5044                .expect("should have tool call entry");
5045
5046            assert!(
5047                matches!(tool_entry.status, ToolCallStatus::Canceled),
5048                "tool should be marked as Canceled when response is Cancelled, got {:?}",
5049                tool_entry.status
5050            );
5051        });
5052    }
5053
5054    #[gpui::test]
5055    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5056        init_test(cx);
5057
5058        let fs = FakeFs::new(cx.executor());
5059        let project = Project::test(fs, [], cx).await;
5060        let connection = Rc::new(FakeAgentConnection::new());
5061        let set_title_calls = connection.set_title_calls.clone();
5062
5063        let thread = cx
5064            .update(|cx| {
5065                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5066            })
5067            .await
5068            .unwrap();
5069
5070        // Initial title is the default.
5071        thread.read_with(cx, |thread, _| {
5072            assert_eq!(thread.title(), None);
5073        });
5074
5075        // Setting a provisional title updates the display title.
5076        thread.update(cx, |thread, cx| {
5077            thread.set_provisional_title("Hello, can you help…".into(), cx);
5078        });
5079        thread.read_with(cx, |thread, _| {
5080            assert_eq!(
5081                thread.title().as_ref().map(|s| s.as_str()),
5082                Some("Hello, can you help…")
5083            );
5084        });
5085
5086        // The provisional title should NOT have propagated to the connection.
5087        assert_eq!(
5088            set_title_calls.borrow().len(),
5089            0,
5090            "provisional title should not propagate to the connection"
5091        );
5092
5093        // When the real title arrives via set_title, it replaces the
5094        // provisional title and propagates to the connection.
5095        let task = thread.update(cx, |thread, cx| {
5096            thread.set_title("Helping with Rust question".into(), cx)
5097        });
5098        task.await.expect("set_title should succeed");
5099        thread.read_with(cx, |thread, _| {
5100            assert_eq!(
5101                thread.title().as_ref().map(|s| s.as_str()),
5102                Some("Helping with Rust question")
5103            );
5104        });
5105        assert_eq!(
5106            set_title_calls.borrow().as_slice(),
5107            &[SharedString::from("Helping with Rust question")],
5108            "real title should propagate to the connection"
5109        );
5110    }
5111
5112    #[gpui::test]
5113    async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5114        cx: &mut TestAppContext,
5115    ) {
5116        init_test(cx);
5117
5118        let fs = FakeFs::new(cx.executor());
5119        let project = Project::test(fs, [], cx).await;
5120        let connection = Rc::new(FakeAgentConnection::new());
5121
5122        let thread = cx
5123            .update(|cx| {
5124                connection.clone().new_session(
5125                    project,
5126                    PathList::new(&[Path::new(path!("/test"))]),
5127                    cx,
5128                )
5129            })
5130            .await
5131            .unwrap();
5132
5133        let title_updated_events = Rc::new(RefCell::new(0usize));
5134        let title_updated_events_for_subscription = title_updated_events.clone();
5135        thread.update(cx, |_thread, cx| {
5136            cx.subscribe(
5137                &thread,
5138                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5139                    if matches!(event, AcpThreadEvent::TitleUpdated) {
5140                        *title_updated_events_for_subscription.borrow_mut() += 1;
5141                    }
5142                },
5143            )
5144            .detach();
5145        });
5146
5147        thread.update(cx, |thread, cx| {
5148            thread.set_provisional_title("Hello, can you help…".into(), cx);
5149        });
5150        assert_eq!(
5151            *title_updated_events.borrow(),
5152            1,
5153            "setting a provisional title should emit TitleUpdated"
5154        );
5155
5156        let result = thread.update(cx, |thread, cx| {
5157            thread.handle_session_update(
5158                acp::SessionUpdate::SessionInfoUpdate(
5159                    acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5160                ),
5161                cx,
5162            )
5163        });
5164        result.expect("session info update should succeed");
5165
5166        thread.read_with(cx, |thread, _| {
5167            assert_eq!(
5168                thread.title().as_ref().map(|s| s.as_str()),
5169                Some("Helping with Rust question")
5170            );
5171            assert!(
5172                !thread.has_provisional_title(),
5173                "session info title update should clear provisional title"
5174            );
5175        });
5176
5177        assert_eq!(
5178            *title_updated_events.borrow(),
5179            2,
5180            "session info title update should emit TitleUpdated"
5181        );
5182        assert!(
5183            connection.set_title_calls.borrow().is_empty(),
5184            "session info title update should not propagate back to the connection"
5185        );
5186    }
5187}