acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  13use itertools::Itertools;
  14use language::language_settings::FormatOnSave;
  15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  16use markdown::Markdown;
  17pub use mention::*;
  18use project::lsp_store::{FormatTrigger, LspFormatTarget};
  19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  20use serde::{Deserialize, Serialize};
  21use serde_json::to_string_pretty;
  22use std::collections::HashMap;
  23use std::error::Error;
  24use std::fmt::{Formatter, Write};
  25use std::ops::Range;
  26use std::process::ExitStatus;
  27use std::rc::Rc;
  28use std::time::{Duration, Instant};
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use task::{Shell, ShellBuilder};
  31pub use terminal::*;
  32use text::Bias;
  33use ui::App;
  34use util::markdown::MarkdownEscaped;
  35use util::path_list::PathList;
  36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  37use uuid::Uuid;
  38
  39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  41pub const TOOL_NAME_META_KEY: &str = "tool_name";
  42
  43/// Helper to extract tool name from ACP meta
  44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  45    meta.as_ref()
  46        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  47        .and_then(|v| v.as_str())
  48        .map(|s| SharedString::from(s.to_owned()))
  49}
  50
  51/// Helper to create meta with tool name
  52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  53    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  54}
  55
  56/// Key used in ACP ToolCall meta to store the session id and message indexes
  57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  58
  59#[derive(Clone, Debug, Deserialize, Serialize)]
  60pub struct SubagentSessionInfo {
  61    /// The session id of the subagent sessiont that was spawned
  62    pub session_id: acp::SessionId,
  63    /// The index of the message of the start of the "turn" run by this tool call
  64    pub message_start_index: usize,
  65    /// The index of the output of the message that the subagent has returned
  66    #[serde(skip_serializing_if = "Option::is_none")]
  67    pub message_end_index: Option<usize>,
  68}
  69
  70/// Helper to extract subagent session id from ACP meta
  71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  72    meta.as_ref()
  73        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  74        .and_then(|v| serde_json::from_value(v.clone()).ok())
  75}
  76
  77#[derive(Debug)]
  78pub struct UserMessage {
  79    pub id: Option<UserMessageId>,
  80    pub content: ContentBlock,
  81    pub chunks: Vec<acp::ContentBlock>,
  82    pub checkpoint: Option<Checkpoint>,
  83    pub indented: bool,
  84}
  85
  86#[derive(Debug)]
  87pub struct Checkpoint {
  88    git_checkpoint: GitStoreCheckpoint,
  89    pub show: bool,
  90}
  91
  92impl UserMessage {
  93    fn to_markdown(&self, cx: &App) -> String {
  94        let mut markdown = String::new();
  95        if self
  96            .checkpoint
  97            .as_ref()
  98            .is_some_and(|checkpoint| checkpoint.show)
  99        {
 100            writeln!(markdown, "## User (checkpoint)").unwrap();
 101        } else {
 102            writeln!(markdown, "## User").unwrap();
 103        }
 104        writeln!(markdown).unwrap();
 105        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 106        writeln!(markdown).unwrap();
 107        markdown
 108    }
 109}
 110
 111#[derive(Debug, PartialEq)]
 112pub struct AssistantMessage {
 113    pub chunks: Vec<AssistantMessageChunk>,
 114    pub indented: bool,
 115    pub is_subagent_output: bool,
 116}
 117
 118impl AssistantMessage {
 119    pub fn to_markdown(&self, cx: &App) -> String {
 120        format!(
 121            "## Assistant\n\n{}\n\n",
 122            self.chunks
 123                .iter()
 124                .map(|chunk| chunk.to_markdown(cx))
 125                .join("\n\n")
 126        )
 127    }
 128}
 129
 130#[derive(Debug, PartialEq)]
 131pub enum AssistantMessageChunk {
 132    Message { block: ContentBlock },
 133    Thought { block: ContentBlock },
 134}
 135
 136impl AssistantMessageChunk {
 137    pub fn from_str(
 138        chunk: &str,
 139        language_registry: &Arc<LanguageRegistry>,
 140        path_style: PathStyle,
 141        cx: &mut App,
 142    ) -> Self {
 143        Self::Message {
 144            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 145        }
 146    }
 147
 148    fn to_markdown(&self, cx: &App) -> String {
 149        match self {
 150            Self::Message { block } => block.to_markdown(cx).to_string(),
 151            Self::Thought { block } => {
 152                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 153            }
 154        }
 155    }
 156}
 157
 158#[derive(Debug)]
 159pub enum AgentThreadEntry {
 160    UserMessage(UserMessage),
 161    AssistantMessage(AssistantMessage),
 162    ToolCall(ToolCall),
 163    CompletedPlan(Vec<PlanEntry>),
 164}
 165
 166impl AgentThreadEntry {
 167    pub fn is_indented(&self) -> bool {
 168        match self {
 169            Self::UserMessage(message) => message.indented,
 170            Self::AssistantMessage(message) => message.indented,
 171            Self::ToolCall(_) => false,
 172            Self::CompletedPlan(_) => false,
 173        }
 174    }
 175
 176    pub fn to_markdown(&self, cx: &App) -> String {
 177        match self {
 178            Self::UserMessage(message) => message.to_markdown(cx),
 179            Self::AssistantMessage(message) => message.to_markdown(cx),
 180            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 181            Self::CompletedPlan(entries) => {
 182                let mut md = String::from("## Plan\n\n");
 183                for entry in entries {
 184                    let source = entry.content.read(cx).source().to_string();
 185                    md.push_str(&format!("- [x] {}\n", source));
 186                }
 187                md
 188            }
 189        }
 190    }
 191
 192    pub fn user_message(&self) -> Option<&UserMessage> {
 193        if let AgentThreadEntry::UserMessage(message) = self {
 194            Some(message)
 195        } else {
 196            None
 197        }
 198    }
 199
 200    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 201        if let AgentThreadEntry::ToolCall(call) = self {
 202            itertools::Either::Left(call.diffs())
 203        } else {
 204            itertools::Either::Right(std::iter::empty())
 205        }
 206    }
 207
 208    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 209        if let AgentThreadEntry::ToolCall(call) = self {
 210            itertools::Either::Left(call.terminals())
 211        } else {
 212            itertools::Either::Right(std::iter::empty())
 213        }
 214    }
 215
 216    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 217        if let AgentThreadEntry::ToolCall(ToolCall {
 218            locations,
 219            resolved_locations,
 220            ..
 221        }) = self
 222        {
 223            Some((
 224                locations.get(ix)?.clone(),
 225                resolved_locations.get(ix)?.clone()?,
 226            ))
 227        } else {
 228            None
 229        }
 230    }
 231}
 232
 233#[derive(Debug)]
 234pub struct ToolCall {
 235    pub id: acp::ToolCallId,
 236    pub label: Entity<Markdown>,
 237    pub kind: acp::ToolKind,
 238    pub content: Vec<ToolCallContent>,
 239    pub status: ToolCallStatus,
 240    pub locations: Vec<acp::ToolCallLocation>,
 241    pub resolved_locations: Vec<Option<AgentLocation>>,
 242    pub raw_input: Option<serde_json::Value>,
 243    pub raw_input_markdown: Option<Entity<Markdown>>,
 244    pub raw_output: Option<serde_json::Value>,
 245    pub tool_name: Option<SharedString>,
 246    pub subagent_session_info: Option<SubagentSessionInfo>,
 247}
 248
 249impl ToolCall {
 250    fn from_acp(
 251        tool_call: acp::ToolCall,
 252        status: ToolCallStatus,
 253        language_registry: Arc<LanguageRegistry>,
 254        path_style: PathStyle,
 255        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 256        cx: &mut App,
 257    ) -> Result<Self> {
 258        let title = if tool_call.kind == acp::ToolKind::Execute {
 259            tool_call.title
 260        } else if tool_call.kind == acp::ToolKind::Edit {
 261            MarkdownEscaped(tool_call.title.as_str()).to_string()
 262        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 263            first_line.to_owned() + ""
 264        } else {
 265            tool_call.title
 266        };
 267        let mut content = Vec::with_capacity(tool_call.content.len());
 268        for item in tool_call.content {
 269            if let Some(item) = ToolCallContent::from_acp(
 270                item,
 271                language_registry.clone(),
 272                path_style,
 273                terminals,
 274                cx,
 275            )? {
 276                content.push(item);
 277            }
 278        }
 279
 280        let raw_input_markdown = tool_call
 281            .raw_input
 282            .as_ref()
 283            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 284
 285        let tool_name = tool_name_from_meta(&tool_call.meta);
 286
 287        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 288
 289        let result = Self {
 290            id: tool_call.tool_call_id,
 291            label: cx
 292                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 293            kind: tool_call.kind,
 294            content,
 295            locations: tool_call.locations,
 296            resolved_locations: Vec::default(),
 297            status,
 298            raw_input: tool_call.raw_input,
 299            raw_input_markdown,
 300            raw_output: tool_call.raw_output,
 301            tool_name,
 302            subagent_session_info,
 303        };
 304        Ok(result)
 305    }
 306
 307    fn update_fields(
 308        &mut self,
 309        fields: acp::ToolCallUpdateFields,
 310        meta: Option<acp::Meta>,
 311        language_registry: Arc<LanguageRegistry>,
 312        path_style: PathStyle,
 313        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 314        cx: &mut App,
 315    ) -> Result<()> {
 316        let acp::ToolCallUpdateFields {
 317            kind,
 318            status,
 319            title,
 320            content,
 321            locations,
 322            raw_input,
 323            raw_output,
 324            ..
 325        } = fields;
 326
 327        if let Some(kind) = kind {
 328            self.kind = kind;
 329        }
 330
 331        if let Some(status) = status {
 332            self.status = status.into();
 333        }
 334
 335        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 336            self.subagent_session_info = Some(subagent_session_info);
 337        }
 338
 339        if let Some(title) = title {
 340            if self.kind == acp::ToolKind::Execute {
 341                for terminal in self.terminals() {
 342                    terminal.update(cx, |terminal, cx| {
 343                        terminal.update_command_label(&title, cx);
 344                    });
 345                }
 346            }
 347            self.label.update(cx, |label, cx| {
 348                if self.kind == acp::ToolKind::Execute {
 349                    label.replace(title, cx);
 350                } else if self.kind == acp::ToolKind::Edit {
 351                    label.replace(MarkdownEscaped(&title).to_string(), cx)
 352                } else if let Some((first_line, _)) = title.split_once("\n") {
 353                    label.replace(first_line.to_owned() + "", cx);
 354                } else {
 355                    label.replace(title, cx);
 356                }
 357            });
 358        }
 359
 360        if let Some(content) = content {
 361            let mut new_content_len = content.len();
 362            let mut content = content.into_iter();
 363
 364            // Reuse existing content if we can
 365            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 366                let valid_content =
 367                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 368                if !valid_content {
 369                    new_content_len -= 1;
 370                }
 371            }
 372            for new in content {
 373                if let Some(new) = ToolCallContent::from_acp(
 374                    new,
 375                    language_registry.clone(),
 376                    path_style,
 377                    terminals,
 378                    cx,
 379                )? {
 380                    self.content.push(new);
 381                } else {
 382                    new_content_len -= 1;
 383                }
 384            }
 385            self.content.truncate(new_content_len);
 386        }
 387
 388        if let Some(locations) = locations {
 389            self.locations = locations;
 390        }
 391
 392        if let Some(raw_input) = raw_input {
 393            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 394            self.raw_input = Some(raw_input);
 395        }
 396
 397        if let Some(raw_output) = raw_output {
 398            if self.content.is_empty()
 399                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 400            {
 401                self.content
 402                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 403                        markdown,
 404                    }));
 405            }
 406            self.raw_output = Some(raw_output);
 407        }
 408        Ok(())
 409    }
 410
 411    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 412        self.content.iter().filter_map(|content| match content {
 413            ToolCallContent::Diff(diff) => Some(diff),
 414            ToolCallContent::ContentBlock(_) => None,
 415            ToolCallContent::Terminal(_) => None,
 416        })
 417    }
 418
 419    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 420        self.content.iter().filter_map(|content| match content {
 421            ToolCallContent::Terminal(terminal) => Some(terminal),
 422            ToolCallContent::ContentBlock(_) => None,
 423            ToolCallContent::Diff(_) => None,
 424        })
 425    }
 426
 427    pub fn is_subagent(&self) -> bool {
 428        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 429            || self.subagent_session_info.is_some()
 430    }
 431
 432    pub fn to_markdown(&self, cx: &App) -> String {
 433        let mut markdown = format!(
 434            "**Tool Call: {}**\nStatus: {}\n\n",
 435            self.label.read(cx).source(),
 436            self.status
 437        );
 438        for content in &self.content {
 439            markdown.push_str(content.to_markdown(cx).as_str());
 440            markdown.push_str("\n\n");
 441        }
 442        markdown
 443    }
 444
 445    async fn resolve_location(
 446        location: acp::ToolCallLocation,
 447        project: WeakEntity<Project>,
 448        cx: &mut AsyncApp,
 449    ) -> Option<ResolvedLocation> {
 450        let buffer = project
 451            .update(cx, |project, cx| {
 452                project
 453                    .project_path_for_absolute_path(&location.path, cx)
 454                    .map(|path| project.open_buffer(path, cx))
 455            })
 456            .ok()??;
 457        let buffer = buffer.await.log_err()?;
 458        let position = buffer.update(cx, |buffer, _| {
 459            let snapshot = buffer.snapshot();
 460            if let Some(row) = location.line {
 461                let column = snapshot.indent_size_for_line(row).len;
 462                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 463                snapshot.anchor_before(point)
 464            } else {
 465                Anchor::min_for_buffer(snapshot.remote_id())
 466            }
 467        });
 468
 469        Some(ResolvedLocation { buffer, position })
 470    }
 471
 472    fn resolve_locations(
 473        &self,
 474        project: Entity<Project>,
 475        cx: &mut App,
 476    ) -> Task<Vec<Option<ResolvedLocation>>> {
 477        let locations = self.locations.clone();
 478        project.update(cx, |_, cx| {
 479            cx.spawn(async move |project, cx| {
 480                let mut new_locations = Vec::new();
 481                for location in locations {
 482                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 483                }
 484                new_locations
 485            })
 486        })
 487    }
 488}
 489
 490// Separate so we can hold a strong reference to the buffer
 491// for saving on the thread
 492#[derive(Clone, Debug, PartialEq, Eq)]
 493struct ResolvedLocation {
 494    buffer: Entity<Buffer>,
 495    position: Anchor,
 496}
 497
 498impl From<&ResolvedLocation> for AgentLocation {
 499    fn from(value: &ResolvedLocation) -> Self {
 500        Self {
 501            buffer: value.buffer.downgrade(),
 502            position: value.position,
 503        }
 504    }
 505}
 506
 507#[derive(Debug, Clone)]
 508pub enum SelectedPermissionParams {
 509    Terminal { patterns: Vec<String> },
 510}
 511
 512#[derive(Debug)]
 513pub struct SelectedPermissionOutcome {
 514    pub option_id: acp::PermissionOptionId,
 515    pub option_kind: acp::PermissionOptionKind,
 516    pub params: Option<SelectedPermissionParams>,
 517}
 518
 519impl SelectedPermissionOutcome {
 520    pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
 521        Self {
 522            option_id,
 523            option_kind,
 524            params: None,
 525        }
 526    }
 527
 528    pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
 529        self.params = params;
 530        self
 531    }
 532}
 533
 534impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
 535    fn from(value: SelectedPermissionOutcome) -> Self {
 536        Self::new(value.option_id)
 537    }
 538}
 539
 540#[derive(Debug)]
 541pub enum RequestPermissionOutcome {
 542    Cancelled,
 543    Selected(SelectedPermissionOutcome),
 544}
 545
 546impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
 547    fn from(value: RequestPermissionOutcome) -> Self {
 548        match value {
 549            RequestPermissionOutcome::Cancelled => Self::Cancelled,
 550            RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
 551        }
 552    }
 553}
 554
 555#[derive(Debug)]
 556pub enum ToolCallStatus {
 557    /// The tool call hasn't started running yet, but we start showing it to
 558    /// the user.
 559    Pending,
 560    /// The tool call is waiting for confirmation from the user.
 561    WaitingForConfirmation {
 562        options: PermissionOptions,
 563        respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
 564    },
 565    /// The tool call is currently running.
 566    InProgress,
 567    /// The tool call completed successfully.
 568    Completed,
 569    /// The tool call failed.
 570    Failed,
 571    /// The user rejected the tool call.
 572    Rejected,
 573    /// The user canceled generation so the tool call was canceled.
 574    Canceled,
 575}
 576
 577impl From<acp::ToolCallStatus> for ToolCallStatus {
 578    fn from(status: acp::ToolCallStatus) -> Self {
 579        match status {
 580            acp::ToolCallStatus::Pending => Self::Pending,
 581            acp::ToolCallStatus::InProgress => Self::InProgress,
 582            acp::ToolCallStatus::Completed => Self::Completed,
 583            acp::ToolCallStatus::Failed => Self::Failed,
 584            _ => Self::Pending,
 585        }
 586    }
 587}
 588
 589impl Display for ToolCallStatus {
 590    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 591        write!(
 592            f,
 593            "{}",
 594            match self {
 595                ToolCallStatus::Pending => "Pending",
 596                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 597                ToolCallStatus::InProgress => "In Progress",
 598                ToolCallStatus::Completed => "Completed",
 599                ToolCallStatus::Failed => "Failed",
 600                ToolCallStatus::Rejected => "Rejected",
 601                ToolCallStatus::Canceled => "Canceled",
 602            }
 603        )
 604    }
 605}
 606
 607#[derive(Debug, PartialEq, Clone)]
 608pub enum ContentBlock {
 609    Empty,
 610    Markdown { markdown: Entity<Markdown> },
 611    ResourceLink { resource_link: acp::ResourceLink },
 612    Image { image: Arc<gpui::Image> },
 613}
 614
 615impl ContentBlock {
 616    pub fn new(
 617        block: acp::ContentBlock,
 618        language_registry: &Arc<LanguageRegistry>,
 619        path_style: PathStyle,
 620        cx: &mut App,
 621    ) -> Self {
 622        let mut this = Self::Empty;
 623        this.append(block, language_registry, path_style, cx);
 624        this
 625    }
 626
 627    pub fn new_combined(
 628        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 629        language_registry: Arc<LanguageRegistry>,
 630        path_style: PathStyle,
 631        cx: &mut App,
 632    ) -> Self {
 633        let mut this = Self::Empty;
 634        for block in blocks {
 635            this.append(block, &language_registry, path_style, cx);
 636        }
 637        this
 638    }
 639
 640    pub fn append(
 641        &mut self,
 642        block: acp::ContentBlock,
 643        language_registry: &Arc<LanguageRegistry>,
 644        path_style: PathStyle,
 645        cx: &mut App,
 646    ) {
 647        match (&mut *self, &block) {
 648            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 649                *self = ContentBlock::ResourceLink {
 650                    resource_link: resource_link.clone(),
 651                };
 652            }
 653            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 654                if let Some(image) = Self::decode_image(image_content) {
 655                    *self = ContentBlock::Image { image };
 656                } else {
 657                    let new_content = Self::image_md(image_content);
 658                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 659                }
 660            }
 661            (ContentBlock::Empty, _) => {
 662                let new_content = Self::block_string_contents(&block, path_style);
 663                *self = Self::create_markdown_block(new_content, language_registry, cx);
 664            }
 665            (ContentBlock::Markdown { markdown }, _) => {
 666                let new_content = Self::block_string_contents(&block, path_style);
 667                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 668            }
 669            (ContentBlock::ResourceLink { resource_link }, _) => {
 670                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 671                let new_content = Self::block_string_contents(&block, path_style);
 672                let combined = format!("{}\n{}", existing_content, new_content);
 673                *self = Self::create_markdown_block(combined, language_registry, cx);
 674            }
 675            (ContentBlock::Image { .. }, _) => {
 676                let new_content = Self::block_string_contents(&block, path_style);
 677                let combined = format!("`Image`\n{}", new_content);
 678                *self = Self::create_markdown_block(combined, language_registry, cx);
 679            }
 680        }
 681    }
 682
 683    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 684        use base64::Engine as _;
 685
 686        let bytes = base64::engine::general_purpose::STANDARD
 687            .decode(image_content.data.as_bytes())
 688            .ok()?;
 689        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 690        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 691    }
 692
 693    fn create_markdown_block(
 694        content: String,
 695        language_registry: &Arc<LanguageRegistry>,
 696        cx: &mut App,
 697    ) -> ContentBlock {
 698        ContentBlock::Markdown {
 699            markdown: cx
 700                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 701        }
 702    }
 703
 704    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 705        match block {
 706            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 707            acp::ContentBlock::ResourceLink(resource_link) => {
 708                Self::resource_link_md(&resource_link.uri, path_style)
 709            }
 710            acp::ContentBlock::Resource(acp::EmbeddedResource {
 711                resource:
 712                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 713                        uri,
 714                        ..
 715                    }),
 716                ..
 717            }) => Self::resource_link_md(uri, path_style),
 718            acp::ContentBlock::Image(image) => Self::image_md(image),
 719            _ => String::new(),
 720        }
 721    }
 722
 723    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 724        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 725            uri.as_link().to_string()
 726        } else {
 727            uri.to_string()
 728        }
 729    }
 730
 731    fn image_md(_image: &acp::ImageContent) -> String {
 732        "`Image`".into()
 733    }
 734
 735    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 736        match self {
 737            ContentBlock::Empty => "",
 738            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 739            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 740            ContentBlock::Image { .. } => "`Image`",
 741        }
 742    }
 743
 744    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 745        match self {
 746            ContentBlock::Empty => None,
 747            ContentBlock::Markdown { markdown } => Some(markdown),
 748            ContentBlock::ResourceLink { .. } => None,
 749            ContentBlock::Image { .. } => None,
 750        }
 751    }
 752
 753    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 754        match self {
 755            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 756            _ => None,
 757        }
 758    }
 759
 760    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 761        match self {
 762            ContentBlock::Image { image } => Some(image),
 763            _ => None,
 764        }
 765    }
 766}
 767
 768#[derive(Debug)]
 769pub enum ToolCallContent {
 770    ContentBlock(ContentBlock),
 771    Diff(Entity<Diff>),
 772    Terminal(Entity<Terminal>),
 773}
 774
 775impl ToolCallContent {
 776    pub fn from_acp(
 777        content: acp::ToolCallContent,
 778        language_registry: Arc<LanguageRegistry>,
 779        path_style: PathStyle,
 780        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 781        cx: &mut App,
 782    ) -> Result<Option<Self>> {
 783        match content {
 784            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 785                Ok(Some(Self::ContentBlock(ContentBlock::new(
 786                    content,
 787                    &language_registry,
 788                    path_style,
 789                    cx,
 790                ))))
 791            }
 792            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 793                Diff::finalized(
 794                    diff.path.to_string_lossy().into_owned(),
 795                    diff.old_text,
 796                    diff.new_text,
 797                    language_registry,
 798                    cx,
 799                )
 800            })))),
 801            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 802                .get(&terminal_id)
 803                .cloned()
 804                .map(|terminal| Some(Self::Terminal(terminal)))
 805                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 806            _ => Ok(None),
 807        }
 808    }
 809
 810    pub fn update_from_acp(
 811        &mut self,
 812        new: acp::ToolCallContent,
 813        language_registry: Arc<LanguageRegistry>,
 814        path_style: PathStyle,
 815        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 816        cx: &mut App,
 817    ) -> Result<bool> {
 818        let needs_update = match (&self, &new) {
 819            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 820                old_diff.read(cx).needs_update(
 821                    new_diff.old_text.as_deref().unwrap_or(""),
 822                    &new_diff.new_text,
 823                    cx,
 824                )
 825            }
 826            _ => true,
 827        };
 828
 829        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 830            if needs_update {
 831                *self = update;
 832            }
 833            Ok(true)
 834        } else {
 835            Ok(false)
 836        }
 837    }
 838
 839    pub fn to_markdown(&self, cx: &App) -> String {
 840        match self {
 841            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 842            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 843            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 844        }
 845    }
 846
 847    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 848        match self {
 849            Self::ContentBlock(content) => content.image(),
 850            _ => None,
 851        }
 852    }
 853}
 854
 855#[derive(Debug, PartialEq)]
 856pub enum ToolCallUpdate {
 857    UpdateFields(acp::ToolCallUpdate),
 858    UpdateDiff(ToolCallUpdateDiff),
 859    UpdateTerminal(ToolCallUpdateTerminal),
 860}
 861
 862impl ToolCallUpdate {
 863    fn id(&self) -> &acp::ToolCallId {
 864        match self {
 865            Self::UpdateFields(update) => &update.tool_call_id,
 866            Self::UpdateDiff(diff) => &diff.id,
 867            Self::UpdateTerminal(terminal) => &terminal.id,
 868        }
 869    }
 870}
 871
 872impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 873    fn from(update: acp::ToolCallUpdate) -> Self {
 874        Self::UpdateFields(update)
 875    }
 876}
 877
 878impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 879    fn from(diff: ToolCallUpdateDiff) -> Self {
 880        Self::UpdateDiff(diff)
 881    }
 882}
 883
 884#[derive(Debug, PartialEq)]
 885pub struct ToolCallUpdateDiff {
 886    pub id: acp::ToolCallId,
 887    pub diff: Entity<Diff>,
 888}
 889
 890impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 891    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 892        Self::UpdateTerminal(terminal)
 893    }
 894}
 895
 896#[derive(Debug, PartialEq)]
 897pub struct ToolCallUpdateTerminal {
 898    pub id: acp::ToolCallId,
 899    pub terminal: Entity<Terminal>,
 900}
 901
 902#[derive(Debug, Default)]
 903pub struct Plan {
 904    pub entries: Vec<PlanEntry>,
 905}
 906
 907#[derive(Debug)]
 908pub struct PlanStats<'a> {
 909    pub in_progress_entry: Option<&'a PlanEntry>,
 910    pub pending: u32,
 911    pub completed: u32,
 912}
 913
 914impl Plan {
 915    pub fn is_empty(&self) -> bool {
 916        self.entries.is_empty()
 917    }
 918
 919    pub fn stats(&self) -> PlanStats<'_> {
 920        let mut stats = PlanStats {
 921            in_progress_entry: None,
 922            pending: 0,
 923            completed: 0,
 924        };
 925
 926        for entry in &self.entries {
 927            match &entry.status {
 928                acp::PlanEntryStatus::Pending => {
 929                    stats.pending += 1;
 930                }
 931                acp::PlanEntryStatus::InProgress => {
 932                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 933                    stats.pending += 1;
 934                }
 935                acp::PlanEntryStatus::Completed => {
 936                    stats.completed += 1;
 937                }
 938                _ => {}
 939            }
 940        }
 941
 942        stats
 943    }
 944}
 945
 946#[derive(Debug)]
 947pub struct PlanEntry {
 948    pub content: Entity<Markdown>,
 949    pub priority: acp::PlanEntryPriority,
 950    pub status: acp::PlanEntryStatus,
 951}
 952
 953impl PlanEntry {
 954    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 955        Self {
 956            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 957            priority: entry.priority,
 958            status: entry.status,
 959        }
 960    }
 961}
 962
 963#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 964pub struct TokenUsage {
 965    pub max_tokens: u64,
 966    pub used_tokens: u64,
 967    pub input_tokens: u64,
 968    pub output_tokens: u64,
 969    pub max_output_tokens: Option<u64>,
 970}
 971
 972pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 973
 974impl TokenUsage {
 975    pub fn ratio(&self) -> TokenUsageRatio {
 976        #[cfg(debug_assertions)]
 977        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 978            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 979            .parse()
 980            .unwrap();
 981        #[cfg(not(debug_assertions))]
 982        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
 983
 984        // When the maximum is unknown because there is no selected model,
 985        // avoid showing the token limit warning.
 986        if self.max_tokens == 0 {
 987            TokenUsageRatio::Normal
 988        } else if self.used_tokens >= self.max_tokens {
 989            TokenUsageRatio::Exceeded
 990        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 991            TokenUsageRatio::Warning
 992        } else {
 993            TokenUsageRatio::Normal
 994        }
 995    }
 996}
 997
 998#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
 999pub enum TokenUsageRatio {
1000    Normal,
1001    Warning,
1002    Exceeded,
1003}
1004
1005#[derive(Debug, Clone)]
1006pub struct RetryStatus {
1007    pub last_error: SharedString,
1008    pub attempt: usize,
1009    pub max_attempts: usize,
1010    pub started_at: Instant,
1011    pub duration: Duration,
1012}
1013
1014struct RunningTurn {
1015    id: u32,
1016    send_task: Task<()>,
1017}
1018
1019pub struct AcpThread {
1020    session_id: acp::SessionId,
1021    work_dirs: Option<PathList>,
1022    parent_session_id: Option<acp::SessionId>,
1023    title: Option<SharedString>,
1024    provisional_title: Option<SharedString>,
1025    entries: Vec<AgentThreadEntry>,
1026    plan: Plan,
1027    project: Entity<Project>,
1028    action_log: Entity<ActionLog>,
1029    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1030    turn_id: u32,
1031    running_turn: Option<RunningTurn>,
1032    connection: Rc<dyn AgentConnection>,
1033    token_usage: Option<TokenUsage>,
1034    prompt_capabilities: acp::PromptCapabilities,
1035    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1036    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1037    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1038    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1039    had_error: bool,
1040    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1041    draft_prompt: Option<Vec<acp::ContentBlock>>,
1042    /// The initial scroll position for the thread view, set during session registration.
1043    ui_scroll_position: Option<gpui::ListOffset>,
1044    /// Buffer for smooth text streaming. Holds text that has been received from
1045    /// the model but not yet revealed in the UI. A timer task drains this buffer
1046    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1047    /// updates.
1048    streaming_text_buffer: Option<StreamingTextBuffer>,
1049}
1050
1051struct StreamingTextBuffer {
1052    /// Text received from the model but not yet appended to the Markdown source.
1053    pending: String,
1054    /// The number of bytes to reveal per timer turn.
1055    bytes_to_reveal_per_tick: usize,
1056    /// The Markdown entity being streamed into.
1057    target: Entity<Markdown>,
1058    /// Timer task that periodically moves text from `pending` into `source`.
1059    _reveal_task: Task<()>,
1060}
1061
1062impl StreamingTextBuffer {
1063    /// The number of milliseconds between each timer tick, controlling how quickly
1064    /// text is revealed.
1065    const TASK_UPDATE_MS: u64 = 16;
1066    /// The time in milliseconds to reveal the entire pending text.
1067    const REVEAL_TARGET: f32 = 200.0;
1068}
1069
1070impl From<&AcpThread> for ActionLogTelemetry {
1071    fn from(value: &AcpThread) -> Self {
1072        Self {
1073            agent_telemetry_id: value.connection().telemetry_id(),
1074            session_id: value.session_id.0.clone(),
1075        }
1076    }
1077}
1078
1079#[derive(Debug)]
1080pub enum AcpThreadEvent {
1081    NewEntry,
1082    TitleUpdated,
1083    TokenUsageUpdated,
1084    EntryUpdated(usize),
1085    EntriesRemoved(Range<usize>),
1086    ToolAuthorizationRequested(acp::ToolCallId),
1087    ToolAuthorizationReceived(acp::ToolCallId),
1088    Retry(RetryStatus),
1089    SubagentSpawned(acp::SessionId),
1090    Stopped(acp::StopReason),
1091    Error,
1092    LoadError(LoadError),
1093    PromptCapabilitiesUpdated,
1094    Refusal,
1095    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1096    ModeUpdated(acp::SessionModeId),
1097    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1098}
1099
1100impl EventEmitter<AcpThreadEvent> for AcpThread {}
1101
1102#[derive(Debug, Clone)]
1103pub enum TerminalProviderEvent {
1104    Created {
1105        terminal_id: acp::TerminalId,
1106        label: String,
1107        cwd: Option<PathBuf>,
1108        output_byte_limit: Option<u64>,
1109        terminal: Entity<::terminal::Terminal>,
1110    },
1111    Output {
1112        terminal_id: acp::TerminalId,
1113        data: Vec<u8>,
1114    },
1115    TitleChanged {
1116        terminal_id: acp::TerminalId,
1117        title: String,
1118    },
1119    Exit {
1120        terminal_id: acp::TerminalId,
1121        status: acp::TerminalExitStatus,
1122    },
1123}
1124
1125#[derive(Debug, Clone)]
1126pub enum TerminalProviderCommand {
1127    WriteInput {
1128        terminal_id: acp::TerminalId,
1129        bytes: Vec<u8>,
1130    },
1131    Resize {
1132        terminal_id: acp::TerminalId,
1133        cols: u16,
1134        rows: u16,
1135    },
1136    Close {
1137        terminal_id: acp::TerminalId,
1138    },
1139}
1140
1141#[derive(PartialEq, Eq, Debug)]
1142pub enum ThreadStatus {
1143    Idle,
1144    Generating,
1145}
1146
1147#[derive(Debug, Clone)]
1148pub enum LoadError {
1149    Unsupported {
1150        command: SharedString,
1151        current_version: SharedString,
1152        minimum_version: SharedString,
1153    },
1154    FailedToInstall(SharedString),
1155    Exited {
1156        status: ExitStatus,
1157    },
1158    Other(SharedString),
1159}
1160
1161impl Display for LoadError {
1162    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1163        match self {
1164            LoadError::Unsupported {
1165                command: path,
1166                current_version,
1167                minimum_version,
1168            } => {
1169                write!(
1170                    f,
1171                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1172                )
1173            }
1174            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1175            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1176            LoadError::Other(msg) => write!(f, "{msg}"),
1177        }
1178    }
1179}
1180
1181impl Error for LoadError {}
1182
1183impl AcpThread {
1184    pub fn new(
1185        parent_session_id: Option<acp::SessionId>,
1186        title: Option<SharedString>,
1187        work_dirs: Option<PathList>,
1188        connection: Rc<dyn AgentConnection>,
1189        project: Entity<Project>,
1190        action_log: Entity<ActionLog>,
1191        session_id: acp::SessionId,
1192        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1193        cx: &mut Context<Self>,
1194    ) -> Self {
1195        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1196        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1197            loop {
1198                let caps = prompt_capabilities_rx.recv().await?;
1199                this.update(cx, |this, cx| {
1200                    this.prompt_capabilities = caps;
1201                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1202                })?;
1203            }
1204        });
1205
1206        Self {
1207            parent_session_id,
1208            work_dirs,
1209            action_log,
1210            shared_buffers: Default::default(),
1211            entries: Default::default(),
1212            plan: Default::default(),
1213            title,
1214            provisional_title: None,
1215            project,
1216            running_turn: None,
1217            turn_id: 0,
1218            connection,
1219            session_id,
1220            token_usage: None,
1221            prompt_capabilities,
1222            _observe_prompt_capabilities: task,
1223            terminals: HashMap::default(),
1224            pending_terminal_output: HashMap::default(),
1225            pending_terminal_exit: HashMap::default(),
1226            had_error: false,
1227            draft_prompt: None,
1228            ui_scroll_position: None,
1229            streaming_text_buffer: None,
1230        }
1231    }
1232
1233    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1234        self.parent_session_id.as_ref()
1235    }
1236
1237    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1238        self.prompt_capabilities.clone()
1239    }
1240
1241    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1242        self.draft_prompt.as_deref()
1243    }
1244
1245    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1246        self.draft_prompt = prompt;
1247    }
1248
1249    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1250        self.ui_scroll_position
1251    }
1252
1253    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1254        self.ui_scroll_position = position;
1255    }
1256
1257    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1258        &self.connection
1259    }
1260
1261    pub fn action_log(&self) -> &Entity<ActionLog> {
1262        &self.action_log
1263    }
1264
1265    pub fn project(&self) -> &Entity<Project> {
1266        &self.project
1267    }
1268
1269    pub fn title(&self) -> Option<SharedString> {
1270        self.title
1271            .clone()
1272            .or_else(|| self.provisional_title.clone())
1273    }
1274
1275    pub fn has_provisional_title(&self) -> bool {
1276        self.provisional_title.is_some()
1277    }
1278
1279    pub fn entries(&self) -> &[AgentThreadEntry] {
1280        &self.entries
1281    }
1282
1283    pub fn session_id(&self) -> &acp::SessionId {
1284        &self.session_id
1285    }
1286
1287    pub fn work_dirs(&self) -> Option<&PathList> {
1288        self.work_dirs.as_ref()
1289    }
1290
1291    pub fn status(&self) -> ThreadStatus {
1292        if self.running_turn.is_some() {
1293            ThreadStatus::Generating
1294        } else {
1295            ThreadStatus::Idle
1296        }
1297    }
1298
1299    pub fn had_error(&self) -> bool {
1300        self.had_error
1301    }
1302
1303    pub fn is_waiting_for_confirmation(&self) -> bool {
1304        for entry in self.entries.iter().rev() {
1305            match entry {
1306                AgentThreadEntry::UserMessage(_) => return false,
1307                AgentThreadEntry::ToolCall(ToolCall {
1308                    status: ToolCallStatus::WaitingForConfirmation { .. },
1309                    ..
1310                }) => return true,
1311                AgentThreadEntry::ToolCall(_)
1312                | AgentThreadEntry::AssistantMessage(_)
1313                | AgentThreadEntry::CompletedPlan(_) => {}
1314            }
1315        }
1316        false
1317    }
1318
1319    pub fn token_usage(&self) -> Option<&TokenUsage> {
1320        self.token_usage.as_ref()
1321    }
1322
1323    pub fn has_pending_edit_tool_calls(&self) -> bool {
1324        for entry in self.entries.iter().rev() {
1325            match entry {
1326                AgentThreadEntry::UserMessage(_) => return false,
1327                AgentThreadEntry::ToolCall(
1328                    call @ ToolCall {
1329                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1330                        ..
1331                    },
1332                ) if call.diffs().next().is_some() => {
1333                    return true;
1334                }
1335                AgentThreadEntry::ToolCall(_)
1336                | AgentThreadEntry::AssistantMessage(_)
1337                | AgentThreadEntry::CompletedPlan(_) => {}
1338            }
1339        }
1340
1341        false
1342    }
1343
1344    pub fn has_in_progress_tool_calls(&self) -> bool {
1345        for entry in self.entries.iter().rev() {
1346            match entry {
1347                AgentThreadEntry::UserMessage(_) => return false,
1348                AgentThreadEntry::ToolCall(ToolCall {
1349                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1350                    ..
1351                }) => {
1352                    return true;
1353                }
1354                AgentThreadEntry::ToolCall(_)
1355                | AgentThreadEntry::AssistantMessage(_)
1356                | AgentThreadEntry::CompletedPlan(_) => {}
1357            }
1358        }
1359
1360        false
1361    }
1362
1363    pub fn used_tools_since_last_user_message(&self) -> bool {
1364        for entry in self.entries.iter().rev() {
1365            match entry {
1366                AgentThreadEntry::UserMessage(..) => return false,
1367                AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1368                    continue;
1369                }
1370                AgentThreadEntry::ToolCall(..) => return true,
1371            }
1372        }
1373
1374        false
1375    }
1376
1377    pub fn handle_session_update(
1378        &mut self,
1379        update: acp::SessionUpdate,
1380        cx: &mut Context<Self>,
1381    ) -> Result<(), acp::Error> {
1382        match update {
1383            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1384                self.push_user_content_block(None, content, cx);
1385            }
1386            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1387                self.push_assistant_content_block(content, false, cx);
1388            }
1389            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1390                self.push_assistant_content_block(content, true, cx);
1391            }
1392            acp::SessionUpdate::ToolCall(tool_call) => {
1393                self.upsert_tool_call(tool_call, cx)?;
1394            }
1395            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1396                self.update_tool_call(tool_call_update, cx)?;
1397            }
1398            acp::SessionUpdate::Plan(plan) => {
1399                self.update_plan(plan, cx);
1400            }
1401            acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1402                if let acp::MaybeUndefined::Value(title) = info_update.title {
1403                    let had_provisional = self.provisional_title.take().is_some();
1404                    let title: SharedString = title.into();
1405                    if self.title.as_ref() != Some(&title) {
1406                        self.title = Some(title);
1407                        cx.emit(AcpThreadEvent::TitleUpdated);
1408                    } else if had_provisional {
1409                        cx.emit(AcpThreadEvent::TitleUpdated);
1410                    }
1411                }
1412            }
1413            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1414                available_commands,
1415                ..
1416            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1417            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1418                current_mode_id,
1419                ..
1420            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1421            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1422                config_options,
1423                ..
1424            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1425            _ => {}
1426        }
1427        Ok(())
1428    }
1429
1430    pub fn push_user_content_block(
1431        &mut self,
1432        message_id: Option<UserMessageId>,
1433        chunk: acp::ContentBlock,
1434        cx: &mut Context<Self>,
1435    ) {
1436        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1437    }
1438
1439    pub fn push_user_content_block_with_indent(
1440        &mut self,
1441        message_id: Option<UserMessageId>,
1442        chunk: acp::ContentBlock,
1443        indented: bool,
1444        cx: &mut Context<Self>,
1445    ) {
1446        let language_registry = self.project.read(cx).languages().clone();
1447        let path_style = self.project.read(cx).path_style(cx);
1448        let entries_len = self.entries.len();
1449
1450        if let Some(last_entry) = self.entries.last_mut()
1451            && let AgentThreadEntry::UserMessage(UserMessage {
1452                id,
1453                content,
1454                chunks,
1455                indented: existing_indented,
1456                ..
1457            }) = last_entry
1458            && *existing_indented == indented
1459        {
1460            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1461            *id = message_id.or(id.take());
1462            content.append(chunk.clone(), &language_registry, path_style, cx);
1463            chunks.push(chunk);
1464            let idx = entries_len - 1;
1465            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1466        } else {
1467            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1468            self.push_entry(
1469                AgentThreadEntry::UserMessage(UserMessage {
1470                    id: message_id,
1471                    content,
1472                    chunks: vec![chunk],
1473                    checkpoint: None,
1474                    indented,
1475                }),
1476                cx,
1477            );
1478        }
1479    }
1480
1481    pub fn push_assistant_content_block(
1482        &mut self,
1483        chunk: acp::ContentBlock,
1484        is_thought: bool,
1485        cx: &mut Context<Self>,
1486    ) {
1487        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1488    }
1489
1490    pub fn push_assistant_content_block_with_indent(
1491        &mut self,
1492        chunk: acp::ContentBlock,
1493        is_thought: bool,
1494        indented: bool,
1495        cx: &mut Context<Self>,
1496    ) {
1497        let path_style = self.project.read(cx).path_style(cx);
1498
1499        // For text chunks going to an existing Markdown block, buffer for smooth
1500        // streaming instead of appending all at once which may feel more choppy.
1501        if let acp::ContentBlock::Text(text_content) = &chunk {
1502            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1503                let entries_len = self.entries.len();
1504                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1505                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1506                return;
1507            }
1508        }
1509
1510        let language_registry = self.project.read(cx).languages().clone();
1511        let entries_len = self.entries.len();
1512        if let Some(last_entry) = self.entries.last_mut()
1513            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1514                chunks,
1515                indented: existing_indented,
1516                is_subagent_output: _,
1517            }) = last_entry
1518            && *existing_indented == indented
1519        {
1520            let idx = entries_len - 1;
1521            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1522            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1523            match (chunks.last_mut(), is_thought) {
1524                (Some(AssistantMessageChunk::Message { block }), false)
1525                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1526                    block.append(chunk, &language_registry, path_style, cx)
1527                }
1528                _ => {
1529                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1530                    if is_thought {
1531                        chunks.push(AssistantMessageChunk::Thought { block })
1532                    } else {
1533                        chunks.push(AssistantMessageChunk::Message { block })
1534                    }
1535                }
1536            }
1537        } else {
1538            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1539            let chunk = if is_thought {
1540                AssistantMessageChunk::Thought { block }
1541            } else {
1542                AssistantMessageChunk::Message { block }
1543            };
1544
1545            self.push_entry(
1546                AgentThreadEntry::AssistantMessage(AssistantMessage {
1547                    chunks: vec![chunk],
1548                    indented,
1549                    is_subagent_output: false,
1550                }),
1551                cx,
1552            );
1553        }
1554    }
1555
1556    fn streaming_markdown_target(
1557        &self,
1558        is_thought: bool,
1559        indented: bool,
1560    ) -> Option<Entity<Markdown>> {
1561        let last_entry = self.entries.last()?;
1562        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1563            chunks,
1564            indented: existing_indented,
1565            ..
1566        }) = last_entry
1567            && *existing_indented == indented
1568            && let [.., chunk] = chunks.as_slice()
1569        {
1570            match (chunk, is_thought) {
1571                (
1572                    AssistantMessageChunk::Message {
1573                        block: ContentBlock::Markdown { markdown },
1574                    },
1575                    false,
1576                )
1577                | (
1578                    AssistantMessageChunk::Thought {
1579                        block: ContentBlock::Markdown { markdown },
1580                    },
1581                    true,
1582                ) => Some(markdown.clone()),
1583                _ => None,
1584            }
1585        } else {
1586            None
1587        }
1588    }
1589
1590    /// Add text to the streaming buffer. If the target changed (e.g. switching
1591    /// from thoughts to message text), flush the old buffer first.
1592    fn buffer_streaming_text(
1593        &mut self,
1594        markdown: &Entity<Markdown>,
1595        text: String,
1596        cx: &mut Context<Self>,
1597    ) {
1598        if let Some(buffer) = &mut self.streaming_text_buffer {
1599            if buffer.target.entity_id() == markdown.entity_id() {
1600                buffer.pending.push_str(&text);
1601
1602                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1603                    / StreamingTextBuffer::REVEAL_TARGET
1604                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1605                    .ceil() as usize;
1606                return;
1607            }
1608            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1609        }
1610
1611        let target = markdown.clone();
1612        let _reveal_task = self.start_streaming_reveal(cx);
1613        let pending_len = text.len();
1614        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1615            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1616            .ceil() as usize;
1617        self.streaming_text_buffer = Some(StreamingTextBuffer {
1618            pending: text,
1619            bytes_to_reveal_per_tick: bytes_to_reveal,
1620            target,
1621            _reveal_task,
1622        });
1623    }
1624
1625    /// Flush all buffered streaming text into the Markdown entity immediately.
1626    fn flush_streaming_text(
1627        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1628        cx: &mut Context<Self>,
1629    ) {
1630        if let Some(buffer) = streaming_text_buffer.take() {
1631            if !buffer.pending.is_empty() {
1632                buffer
1633                    .target
1634                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1635            }
1636        }
1637    }
1638
1639    /// Spawns a foreground task that periodically drains
1640    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1641    /// producing smooth, continuous text output.
1642    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1643        cx.spawn(async move |this, cx| {
1644            loop {
1645                cx.background_executor()
1646                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1647                    .await;
1648
1649                let should_continue = this
1650                    .update(cx, |this, cx| {
1651                        let Some(buffer) = &mut this.streaming_text_buffer else {
1652                            return false;
1653                        };
1654
1655                        if buffer.pending.is_empty() {
1656                            return true;
1657                        }
1658
1659                        let pending_len = buffer.pending.len();
1660
1661                        let byte_boundary = buffer
1662                            .pending
1663                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1664                            .min(pending_len);
1665
1666                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1667                            markdown.append(&buffer.pending[..byte_boundary], cx);
1668                            buffer.pending.drain(..byte_boundary);
1669                        });
1670
1671                        true
1672                    })
1673                    .unwrap_or(false);
1674
1675                if !should_continue {
1676                    break;
1677                }
1678            }
1679        })
1680    }
1681
1682    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1683        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1684        self.entries.push(entry);
1685        cx.emit(AcpThreadEvent::NewEntry);
1686    }
1687
1688    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1689        self.connection.set_title(&self.session_id, cx).is_some()
1690    }
1691
1692    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1693        let had_provisional = self.provisional_title.take().is_some();
1694        if self.title.as_ref() != Some(&title) {
1695            self.title = Some(title.clone());
1696            cx.emit(AcpThreadEvent::TitleUpdated);
1697            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1698                return set_title.run(title, cx);
1699            }
1700        } else if had_provisional {
1701            cx.emit(AcpThreadEvent::TitleUpdated);
1702        }
1703        Task::ready(Ok(()))
1704    }
1705
1706    /// Sets a provisional display title without propagating back to the
1707    /// underlying agent connection. This is used for quick preview titles
1708    /// (e.g. first 20 chars of the user message) that should be shown
1709    /// immediately but replaced once the LLM generates a proper title via
1710    /// `set_title`.
1711    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1712        self.provisional_title = Some(title);
1713        cx.emit(AcpThreadEvent::TitleUpdated);
1714    }
1715
1716    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1717        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1718    }
1719
1720    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1721        self.token_usage = usage;
1722        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1723    }
1724
1725    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1726        cx.emit(AcpThreadEvent::Retry(status));
1727    }
1728
1729    pub fn update_tool_call(
1730        &mut self,
1731        update: impl Into<ToolCallUpdate>,
1732        cx: &mut Context<Self>,
1733    ) -> Result<()> {
1734        let update = update.into();
1735        let languages = self.project.read(cx).languages().clone();
1736        let path_style = self.project.read(cx).path_style(cx);
1737
1738        let ix = match self.index_for_tool_call(update.id()) {
1739            Some(ix) => ix,
1740            None => {
1741                // Tool call not found - create a failed tool call entry
1742                let failed_tool_call = ToolCall {
1743                    id: update.id().clone(),
1744                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1745                    kind: acp::ToolKind::Fetch,
1746                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1747                        "Tool call not found".into(),
1748                        &languages,
1749                        path_style,
1750                        cx,
1751                    ))],
1752                    status: ToolCallStatus::Failed,
1753                    locations: Vec::new(),
1754                    resolved_locations: Vec::new(),
1755                    raw_input: None,
1756                    raw_input_markdown: None,
1757                    raw_output: None,
1758                    tool_name: None,
1759                    subagent_session_info: None,
1760                };
1761                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1762                return Ok(());
1763            }
1764        };
1765        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1766            unreachable!()
1767        };
1768
1769        match update {
1770            ToolCallUpdate::UpdateFields(update) => {
1771                let location_updated = update.fields.locations.is_some();
1772                call.update_fields(
1773                    update.fields,
1774                    update.meta,
1775                    languages,
1776                    path_style,
1777                    &self.terminals,
1778                    cx,
1779                )?;
1780                if location_updated {
1781                    self.resolve_locations(update.tool_call_id, cx);
1782                }
1783            }
1784            ToolCallUpdate::UpdateDiff(update) => {
1785                call.content.clear();
1786                call.content.push(ToolCallContent::Diff(update.diff));
1787            }
1788            ToolCallUpdate::UpdateTerminal(update) => {
1789                call.content.clear();
1790                call.content
1791                    .push(ToolCallContent::Terminal(update.terminal));
1792            }
1793        }
1794
1795        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1796
1797        Ok(())
1798    }
1799
1800    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1801    pub fn upsert_tool_call(
1802        &mut self,
1803        tool_call: acp::ToolCall,
1804        cx: &mut Context<Self>,
1805    ) -> Result<(), acp::Error> {
1806        let status = tool_call.status.into();
1807        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1808    }
1809
1810    /// Fails if id does not match an existing entry.
1811    pub fn upsert_tool_call_inner(
1812        &mut self,
1813        update: acp::ToolCallUpdate,
1814        status: ToolCallStatus,
1815        cx: &mut Context<Self>,
1816    ) -> Result<(), acp::Error> {
1817        let language_registry = self.project.read(cx).languages().clone();
1818        let path_style = self.project.read(cx).path_style(cx);
1819        let id = update.tool_call_id.clone();
1820
1821        let agent_telemetry_id = self.connection().telemetry_id();
1822        let session = self.session_id();
1823        let parent_session_id = self.parent_session_id();
1824        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1825            let status = if matches!(status, ToolCallStatus::Completed) {
1826                "completed"
1827            } else {
1828                "failed"
1829            };
1830            telemetry::event!(
1831                "Agent Tool Call Completed",
1832                agent_telemetry_id,
1833                session,
1834                parent_session_id,
1835                status
1836            );
1837        }
1838
1839        if let Some(ix) = self.index_for_tool_call(&id) {
1840            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1841                unreachable!()
1842            };
1843
1844            call.update_fields(
1845                update.fields,
1846                update.meta,
1847                language_registry,
1848                path_style,
1849                &self.terminals,
1850                cx,
1851            )?;
1852            call.status = status;
1853
1854            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1855        } else {
1856            let call = ToolCall::from_acp(
1857                update.try_into()?,
1858                status,
1859                language_registry,
1860                self.project.read(cx).path_style(cx),
1861                &self.terminals,
1862                cx,
1863            )?;
1864            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1865        };
1866
1867        self.resolve_locations(id, cx);
1868        Ok(())
1869    }
1870
1871    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1872        self.entries
1873            .iter()
1874            .enumerate()
1875            .rev()
1876            .find_map(|(index, entry)| {
1877                if let AgentThreadEntry::ToolCall(tool_call) = entry
1878                    && &tool_call.id == id
1879                {
1880                    Some(index)
1881                } else {
1882                    None
1883                }
1884            })
1885    }
1886
1887    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1888        // The tool call we are looking for is typically the last one, or very close to the end.
1889        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1890        self.entries
1891            .iter_mut()
1892            .enumerate()
1893            .rev()
1894            .find_map(|(index, tool_call)| {
1895                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1896                    && &tool_call.id == id
1897                {
1898                    Some((index, tool_call))
1899                } else {
1900                    None
1901                }
1902            })
1903    }
1904
1905    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1906        self.entries
1907            .iter()
1908            .enumerate()
1909            .rev()
1910            .find_map(|(index, tool_call)| {
1911                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1912                    && &tool_call.id == id
1913                {
1914                    Some((index, tool_call))
1915                } else {
1916                    None
1917                }
1918            })
1919    }
1920
1921    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1922        self.entries.iter().find_map(|entry| match entry {
1923            AgentThreadEntry::ToolCall(tool_call) => {
1924                if let Some(subagent_session_info) = &tool_call.subagent_session_info
1925                    && &subagent_session_info.session_id == session_id
1926                {
1927                    Some(tool_call)
1928                } else {
1929                    None
1930                }
1931            }
1932            _ => None,
1933        })
1934    }
1935
1936    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1937        let project = self.project.clone();
1938        let should_update_agent_location = self.parent_session_id.is_none();
1939        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1940            return;
1941        };
1942        let task = tool_call.resolve_locations(project, cx);
1943        cx.spawn(async move |this, cx| {
1944            let resolved_locations = task.await;
1945
1946            this.update(cx, |this, cx| {
1947                let project = this.project.clone();
1948
1949                for location in resolved_locations.iter().flatten() {
1950                    this.shared_buffers
1951                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1952                }
1953                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1954                    return;
1955                };
1956
1957                if let Some(Some(location)) = resolved_locations.last() {
1958                    project.update(cx, |project, cx| {
1959                        let should_ignore = if let Some(agent_location) = project
1960                            .agent_location()
1961                            .filter(|agent_location| agent_location.buffer == location.buffer)
1962                        {
1963                            let snapshot = location.buffer.read(cx).snapshot();
1964                            let old_position = agent_location.position.to_point(&snapshot);
1965                            let new_position = location.position.to_point(&snapshot);
1966
1967                            // ignore this so that when we get updates from the edit tool
1968                            // the position doesn't reset to the startof line
1969                            old_position.row == new_position.row
1970                                && old_position.column > new_position.column
1971                        } else {
1972                            false
1973                        };
1974                        if !should_ignore && should_update_agent_location {
1975                            project.set_agent_location(Some(location.into()), cx);
1976                        }
1977                    });
1978                }
1979
1980                let resolved_locations = resolved_locations
1981                    .iter()
1982                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1983                    .collect::<Vec<_>>();
1984
1985                if tool_call.resolved_locations != resolved_locations {
1986                    tool_call.resolved_locations = resolved_locations;
1987                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1988                }
1989            })
1990        })
1991        .detach();
1992    }
1993
1994    pub fn request_tool_call_authorization(
1995        &mut self,
1996        tool_call: acp::ToolCallUpdate,
1997        options: PermissionOptions,
1998        cx: &mut Context<Self>,
1999    ) -> Result<Task<RequestPermissionOutcome>> {
2000        let (tx, rx) = oneshot::channel();
2001
2002        let status = ToolCallStatus::WaitingForConfirmation {
2003            options,
2004            respond_tx: tx,
2005        };
2006
2007        let tool_call_id = tool_call.tool_call_id.clone();
2008        self.upsert_tool_call_inner(tool_call, status, cx)?;
2009        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2010            tool_call_id.clone(),
2011        ));
2012
2013        Ok(cx.spawn(async move |this, cx| {
2014            let outcome = match rx.await {
2015                Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2016                Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2017            };
2018            this.update(cx, |_this, cx| {
2019                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2020            })
2021            .ok();
2022            outcome
2023        }))
2024    }
2025
2026    pub fn authorize_tool_call(
2027        &mut self,
2028        id: acp::ToolCallId,
2029        outcome: SelectedPermissionOutcome,
2030        cx: &mut Context<Self>,
2031    ) {
2032        let Some((ix, call)) = self.tool_call_mut(&id) else {
2033            return;
2034        };
2035
2036        let new_status = match outcome.option_kind {
2037            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2038                ToolCallStatus::Rejected
2039            }
2040            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2041                ToolCallStatus::InProgress
2042            }
2043            _ => ToolCallStatus::InProgress,
2044        };
2045
2046        let curr_status = mem::replace(&mut call.status, new_status);
2047
2048        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2049            respond_tx.send(outcome).log_err();
2050        } else if cfg!(debug_assertions) {
2051            panic!("tried to authorize an already authorized tool call");
2052        }
2053
2054        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2055    }
2056
2057    pub fn plan(&self) -> &Plan {
2058        &self.plan
2059    }
2060
2061    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2062        let new_entries_len = request.entries.len();
2063        let mut new_entries = request.entries.into_iter();
2064
2065        // Reuse existing markdown to prevent flickering
2066        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2067            let PlanEntry {
2068                content,
2069                priority,
2070                status,
2071            } = old;
2072            content.update(cx, |old, cx| {
2073                old.replace(new.content, cx);
2074            });
2075            *priority = new.priority;
2076            *status = new.status;
2077        }
2078        for new in new_entries {
2079            self.plan.entries.push(PlanEntry::from_acp(new, cx))
2080        }
2081        self.plan.entries.truncate(new_entries_len);
2082
2083        cx.notify();
2084    }
2085
2086    pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2087        if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2088            let completed_entries = std::mem::take(&mut self.plan.entries);
2089            self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2090        }
2091    }
2092
2093    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2094        self.plan
2095            .entries
2096            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2097        cx.notify();
2098    }
2099
2100    pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2101        self.plan.entries.clear();
2102        cx.notify();
2103    }
2104
2105    #[cfg(any(test, feature = "test-support"))]
2106    pub fn send_raw(
2107        &mut self,
2108        message: &str,
2109        cx: &mut Context<Self>,
2110    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2111        self.send(vec![message.into()], cx)
2112    }
2113
2114    pub fn send(
2115        &mut self,
2116        message: Vec<acp::ContentBlock>,
2117        cx: &mut Context<Self>,
2118    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2119        let block = ContentBlock::new_combined(
2120            message.clone(),
2121            self.project.read(cx).languages().clone(),
2122            self.project.read(cx).path_style(cx),
2123            cx,
2124        );
2125        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2126        let git_store = self.project.read(cx).git_store().clone();
2127
2128        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2129            Some(UserMessageId::new())
2130        } else {
2131            None
2132        };
2133
2134        self.run_turn(cx, async move |this, cx| {
2135            this.update(cx, |this, cx| {
2136                this.push_entry(
2137                    AgentThreadEntry::UserMessage(UserMessage {
2138                        id: message_id.clone(),
2139                        content: block,
2140                        chunks: message,
2141                        checkpoint: None,
2142                        indented: false,
2143                    }),
2144                    cx,
2145                );
2146            })
2147            .ok();
2148
2149            let old_checkpoint = git_store
2150                .update(cx, |git, cx| git.checkpoint(cx))
2151                .await
2152                .context("failed to get old checkpoint")
2153                .log_err();
2154            this.update(cx, |this, cx| {
2155                if let Some((_ix, message)) = this.last_user_message() {
2156                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2157                        git_checkpoint,
2158                        show: false,
2159                    });
2160                }
2161                this.connection.prompt(message_id, request, cx)
2162            })?
2163            .await
2164        })
2165    }
2166
2167    pub fn can_retry(&self, cx: &App) -> bool {
2168        self.connection.retry(&self.session_id, cx).is_some()
2169    }
2170
2171    pub fn retry(
2172        &mut self,
2173        cx: &mut Context<Self>,
2174    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2175        self.run_turn(cx, async move |this, cx| {
2176            this.update(cx, |this, cx| {
2177                this.connection
2178                    .retry(&this.session_id, cx)
2179                    .map(|retry| retry.run(cx))
2180            })?
2181            .context("retrying a session is not supported")?
2182            .await
2183        })
2184    }
2185
2186    fn run_turn(
2187        &mut self,
2188        cx: &mut Context<Self>,
2189        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2190    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2191        self.clear_completed_plan_entries(cx);
2192        self.had_error = false;
2193
2194        let (tx, rx) = oneshot::channel();
2195        let cancel_task = self.cancel(cx);
2196
2197        self.turn_id += 1;
2198        let turn_id = self.turn_id;
2199        self.running_turn = Some(RunningTurn {
2200            id: turn_id,
2201            send_task: cx.spawn(async move |this, cx| {
2202                cancel_task.await;
2203                tx.send(f(this, cx).await).ok();
2204            }),
2205        });
2206
2207        cx.spawn(async move |this, cx| {
2208            let response = rx.await;
2209
2210            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2211                .await?;
2212
2213            this.update(cx, |this, cx| {
2214                if this.parent_session_id.is_none() {
2215                    this.project
2216                        .update(cx, |project, cx| project.set_agent_location(None, cx));
2217                }
2218                let Ok(response) = response else {
2219                    // tx dropped, just return
2220                    return Ok(None);
2221                };
2222
2223                let is_same_turn = this
2224                    .running_turn
2225                    .as_ref()
2226                    .is_some_and(|turn| turn_id == turn.id);
2227
2228                // If the user submitted a follow up message, running_turn might
2229                // already point to a different turn. Therefore we only want to
2230                // take the task if it's the same turn.
2231                if is_same_turn {
2232                    this.running_turn.take();
2233                }
2234
2235                match response {
2236                    Ok(r) => {
2237                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2238
2239                        if r.stop_reason == acp::StopReason::MaxTokens {
2240                            this.had_error = true;
2241                            cx.emit(AcpThreadEvent::Error);
2242                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2243                            return Err(anyhow!("Max tokens reached"));
2244                        }
2245
2246                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2247                        if canceled {
2248                            this.mark_pending_tools_as_canceled();
2249                        }
2250
2251                        if !canceled {
2252                            this.snapshot_completed_plan(cx);
2253                        }
2254
2255                        // Handle refusal - distinguish between user prompt and tool call refusals
2256                        if let acp::StopReason::Refusal = r.stop_reason {
2257                            this.had_error = true;
2258                            if let Some((user_msg_ix, _)) = this.last_user_message() {
2259                                // Check if there's a completed tool call with results after the last user message
2260                                // This indicates the refusal is in response to tool output, not the user's prompt
2261                                let has_completed_tool_call_after_user_msg =
2262                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2263                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2264                                            // Check if the tool call has completed and has output
2265                                            matches!(tool_call.status, ToolCallStatus::Completed)
2266                                                && tool_call.raw_output.is_some()
2267                                        } else {
2268                                            false
2269                                        }
2270                                    });
2271
2272                                if has_completed_tool_call_after_user_msg {
2273                                    // Refusal is due to tool output - don't truncate, just notify
2274                                    // The model refused based on what the tool returned
2275                                    cx.emit(AcpThreadEvent::Refusal);
2276                                } else {
2277                                    // User prompt was refused - truncate back to before the user message
2278                                    let range = user_msg_ix..this.entries.len();
2279                                    if range.start < range.end {
2280                                        this.entries.truncate(user_msg_ix);
2281                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2282                                    }
2283                                    cx.emit(AcpThreadEvent::Refusal);
2284                                }
2285                            } else {
2286                                // No user message found, treat as general refusal
2287                                cx.emit(AcpThreadEvent::Refusal);
2288                            }
2289                        }
2290
2291                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2292                        Ok(Some(r))
2293                    }
2294                    Err(e) => {
2295                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2296
2297                        this.had_error = true;
2298                        cx.emit(AcpThreadEvent::Error);
2299                        log::error!("Error in run turn: {:?}", e);
2300                        Err(e)
2301                    }
2302                }
2303            })?
2304        })
2305        .boxed()
2306    }
2307
2308    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2309        let Some(turn) = self.running_turn.take() else {
2310            return Task::ready(());
2311        };
2312        self.connection.cancel(&self.session_id, cx);
2313
2314        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2315        self.mark_pending_tools_as_canceled();
2316
2317        // Wait for the send task to complete
2318        cx.background_spawn(turn.send_task)
2319    }
2320
2321    fn mark_pending_tools_as_canceled(&mut self) {
2322        for entry in self.entries.iter_mut() {
2323            if let AgentThreadEntry::ToolCall(call) = entry {
2324                let cancel = matches!(
2325                    call.status,
2326                    ToolCallStatus::Pending
2327                        | ToolCallStatus::WaitingForConfirmation { .. }
2328                        | ToolCallStatus::InProgress
2329                );
2330
2331                if cancel {
2332                    call.status = ToolCallStatus::Canceled;
2333                }
2334            }
2335        }
2336    }
2337
2338    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2339    pub fn restore_checkpoint(
2340        &mut self,
2341        id: UserMessageId,
2342        cx: &mut Context<Self>,
2343    ) -> Task<Result<()>> {
2344        let Some((_, message)) = self.user_message_mut(&id) else {
2345            return Task::ready(Err(anyhow!("message not found")));
2346        };
2347
2348        let checkpoint = message
2349            .checkpoint
2350            .as_ref()
2351            .map(|c| c.git_checkpoint.clone());
2352
2353        // Cancel any in-progress generation before restoring
2354        let cancel_task = self.cancel(cx);
2355        let rewind = self.rewind(id.clone(), cx);
2356        let git_store = self.project.read(cx).git_store().clone();
2357
2358        cx.spawn(async move |_, cx| {
2359            cancel_task.await;
2360            rewind.await?;
2361            if let Some(checkpoint) = checkpoint {
2362                git_store
2363                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2364                    .await?;
2365            }
2366
2367            Ok(())
2368        })
2369    }
2370
2371    /// Rewinds this thread to before the entry at `index`, removing it and all
2372    /// subsequent entries while rejecting any action_log changes made from that point.
2373    /// Unlike `restore_checkpoint`, this method does not restore from git.
2374    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2375        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2376            return Task::ready(Err(anyhow!("not supported")));
2377        };
2378
2379        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2380        let telemetry = ActionLogTelemetry::from(&*self);
2381        cx.spawn(async move |this, cx| {
2382            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2383            this.update(cx, |this, cx| {
2384                if let Some((ix, _)) = this.user_message_mut(&id) {
2385                    // Collect all terminals from entries that will be removed
2386                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2387                        .iter()
2388                        .flat_map(|entry| entry.terminals())
2389                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2390                        .collect();
2391
2392                    let range = ix..this.entries.len();
2393                    this.entries.truncate(ix);
2394                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2395
2396                    // Kill and remove the terminals
2397                    for terminal_id in terminals_to_remove {
2398                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2399                            terminal.update(cx, |terminal, cx| {
2400                                terminal.kill(cx);
2401                            });
2402                        }
2403                    }
2404                }
2405                this.action_log().update(cx, |action_log, cx| {
2406                    action_log.reject_all_edits(Some(telemetry), cx)
2407                })
2408            })?
2409            .await;
2410            Ok(())
2411        })
2412    }
2413
2414    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2415        let git_store = self.project.read(cx).git_store().clone();
2416
2417        let Some((_, message)) = self.last_user_message() else {
2418            return Task::ready(Ok(()));
2419        };
2420        let Some(user_message_id) = message.id.clone() else {
2421            return Task::ready(Ok(()));
2422        };
2423        let Some(checkpoint) = message.checkpoint.as_ref() else {
2424            return Task::ready(Ok(()));
2425        };
2426        let old_checkpoint = checkpoint.git_checkpoint.clone();
2427
2428        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2429        cx.spawn(async move |this, cx| {
2430            let Some(new_checkpoint) = new_checkpoint
2431                .await
2432                .context("failed to get new checkpoint")
2433                .log_err()
2434            else {
2435                return Ok(());
2436            };
2437
2438            let equal = git_store
2439                .update(cx, |git, cx| {
2440                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2441                })
2442                .await
2443                .unwrap_or(true);
2444
2445            this.update(cx, |this, cx| {
2446                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2447                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2448                        checkpoint.show = !equal;
2449                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2450                    }
2451                }
2452            })?;
2453
2454            Ok(())
2455        })
2456    }
2457
2458    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2459        self.entries
2460            .iter_mut()
2461            .enumerate()
2462            .rev()
2463            .find_map(|(ix, entry)| {
2464                if let AgentThreadEntry::UserMessage(message) = entry {
2465                    Some((ix, message))
2466                } else {
2467                    None
2468                }
2469            })
2470    }
2471
2472    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2473        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2474            if let AgentThreadEntry::UserMessage(message) = entry {
2475                if message.id.as_ref() == Some(id) {
2476                    Some((ix, message))
2477                } else {
2478                    None
2479                }
2480            } else {
2481                None
2482            }
2483        })
2484    }
2485
2486    pub fn read_text_file(
2487        &self,
2488        path: PathBuf,
2489        line: Option<u32>,
2490        limit: Option<u32>,
2491        reuse_shared_snapshot: bool,
2492        cx: &mut Context<Self>,
2493    ) -> Task<Result<String, acp::Error>> {
2494        // Args are 1-based, move to 0-based
2495        let line = line.unwrap_or_default().saturating_sub(1);
2496        let limit = limit.unwrap_or(u32::MAX);
2497        let project = self.project.clone();
2498        let action_log = self.action_log.clone();
2499        let should_update_agent_location = self.parent_session_id.is_none();
2500        cx.spawn(async move |this, cx| {
2501            let load = project.update(cx, |project, cx| {
2502                let path = project
2503                    .project_path_for_absolute_path(&path, cx)
2504                    .ok_or_else(|| {
2505                        acp::Error::resource_not_found(Some(path.display().to_string()))
2506                    })?;
2507                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2508            })?;
2509
2510            let buffer = load.await?;
2511
2512            let snapshot = if reuse_shared_snapshot {
2513                this.read_with(cx, |this, _| {
2514                    this.shared_buffers.get(&buffer.clone()).cloned()
2515                })
2516                .log_err()
2517                .flatten()
2518            } else {
2519                None
2520            };
2521
2522            let snapshot = if let Some(snapshot) = snapshot {
2523                snapshot
2524            } else {
2525                action_log.update(cx, |action_log, cx| {
2526                    action_log.buffer_read(buffer.clone(), cx);
2527                });
2528
2529                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2530                this.update(cx, |this, _| {
2531                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2532                })?;
2533                snapshot
2534            };
2535
2536            let max_point = snapshot.max_point();
2537            let start_position = Point::new(line, 0);
2538
2539            if start_position > max_point {
2540                return Err(acp::Error::invalid_params().data(format!(
2541                    "Attempting to read beyond the end of the file, line {}:{}",
2542                    max_point.row + 1,
2543                    max_point.column
2544                )));
2545            }
2546
2547            let start = snapshot.anchor_before(start_position);
2548            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2549
2550            if should_update_agent_location {
2551                project.update(cx, |project, cx| {
2552                    project.set_agent_location(
2553                        Some(AgentLocation {
2554                            buffer: buffer.downgrade(),
2555                            position: start,
2556                        }),
2557                        cx,
2558                    );
2559                });
2560            }
2561
2562            Ok(snapshot.text_for_range(start..end).collect::<String>())
2563        })
2564    }
2565
2566    pub fn write_text_file(
2567        &self,
2568        path: PathBuf,
2569        content: String,
2570        cx: &mut Context<Self>,
2571    ) -> Task<Result<()>> {
2572        let project = self.project.clone();
2573        let action_log = self.action_log.clone();
2574        let should_update_agent_location = self.parent_session_id.is_none();
2575        cx.spawn(async move |this, cx| {
2576            let load = project.update(cx, |project, cx| {
2577                let path = project
2578                    .project_path_for_absolute_path(&path, cx)
2579                    .context("invalid path")?;
2580                anyhow::Ok(project.open_buffer(path, cx))
2581            });
2582            let buffer = load?.await?;
2583            let snapshot = this.update(cx, |this, cx| {
2584                this.shared_buffers
2585                    .get(&buffer)
2586                    .cloned()
2587                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2588            })?;
2589            let edits = cx
2590                .background_executor()
2591                .spawn(async move {
2592                    let old_text = snapshot.text();
2593                    text_diff(old_text.as_str(), &content)
2594                        .into_iter()
2595                        .map(|(range, replacement)| {
2596                            (snapshot.anchor_range_around(range), replacement)
2597                        })
2598                        .collect::<Vec<_>>()
2599                })
2600                .await;
2601
2602            if should_update_agent_location {
2603                project.update(cx, |project, cx| {
2604                    project.set_agent_location(
2605                        Some(AgentLocation {
2606                            buffer: buffer.downgrade(),
2607                            position: edits
2608                                .last()
2609                                .map(|(range, _)| range.end)
2610                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2611                        }),
2612                        cx,
2613                    );
2614                });
2615            }
2616
2617            let format_on_save = cx.update(|cx| {
2618                action_log.update(cx, |action_log, cx| {
2619                    action_log.buffer_read(buffer.clone(), cx);
2620                });
2621
2622                let format_on_save = buffer.update(cx, |buffer, cx| {
2623                    buffer.edit(edits, None, cx);
2624
2625                    let settings =
2626                        language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2627
2628                    settings.format_on_save != FormatOnSave::Off
2629                });
2630                action_log.update(cx, |action_log, cx| {
2631                    action_log.buffer_edited(buffer.clone(), cx);
2632                });
2633                format_on_save
2634            });
2635
2636            if format_on_save {
2637                let format_task = project.update(cx, |project, cx| {
2638                    project.format(
2639                        HashSet::from_iter([buffer.clone()]),
2640                        LspFormatTarget::Buffers,
2641                        false,
2642                        FormatTrigger::Save,
2643                        cx,
2644                    )
2645                });
2646                format_task.await.log_err();
2647
2648                action_log.update(cx, |action_log, cx| {
2649                    action_log.buffer_edited(buffer.clone(), cx);
2650                });
2651            }
2652
2653            project
2654                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2655                .await
2656        })
2657    }
2658
2659    pub fn create_terminal(
2660        &self,
2661        command: String,
2662        args: Vec<String>,
2663        extra_env: Vec<acp::EnvVariable>,
2664        cwd: Option<PathBuf>,
2665        output_byte_limit: Option<u64>,
2666        cx: &mut Context<Self>,
2667    ) -> Task<Result<Entity<Terminal>>> {
2668        let env = match &cwd {
2669            Some(dir) => self.project.update(cx, |project, cx| {
2670                project.environment().update(cx, |env, cx| {
2671                    env.directory_environment(dir.as_path().into(), cx)
2672                })
2673            }),
2674            None => Task::ready(None).shared(),
2675        };
2676        let env = cx.spawn(async move |_, _| {
2677            let mut env = env.await.unwrap_or_default();
2678            // Disables paging for `git` and hopefully other commands
2679            env.insert("PAGER".into(), "".into());
2680            for var in extra_env {
2681                env.insert(var.name, var.value);
2682            }
2683            env
2684        });
2685
2686        let project = self.project.clone();
2687        let language_registry = project.read(cx).languages().clone();
2688        let is_windows = project.read(cx).path_style(cx).is_windows();
2689
2690        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2691        let terminal_task = cx.spawn({
2692            let terminal_id = terminal_id.clone();
2693            async move |_this, cx| {
2694                let env = env.await;
2695                let shell = project
2696                    .update(cx, |project, cx| {
2697                        project
2698                            .remote_client()
2699                            .and_then(|r| r.read(cx).default_system_shell())
2700                    })
2701                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2702                let (task_command, task_args) =
2703                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2704                        .redirect_stdin_to_dev_null()
2705                        .build(Some(command.clone()), &args);
2706                let terminal = project
2707                    .update(cx, |project, cx| {
2708                        project.create_terminal_task(
2709                            task::SpawnInTerminal {
2710                                command: Some(task_command),
2711                                args: task_args,
2712                                cwd: cwd.clone(),
2713                                env,
2714                                ..Default::default()
2715                            },
2716                            cx,
2717                        )
2718                    })
2719                    .await?;
2720
2721                anyhow::Ok(cx.new(|cx| {
2722                    Terminal::new(
2723                        terminal_id,
2724                        &format!("{} {}", command, args.join(" ")),
2725                        cwd,
2726                        output_byte_limit.map(|l| l as usize),
2727                        terminal,
2728                        language_registry,
2729                        cx,
2730                    )
2731                }))
2732            }
2733        });
2734
2735        cx.spawn(async move |this, cx| {
2736            let terminal = terminal_task.await?;
2737            this.update(cx, |this, _cx| {
2738                this.terminals.insert(terminal_id, terminal.clone());
2739                terminal
2740            })
2741        })
2742    }
2743
2744    pub fn kill_terminal(
2745        &mut self,
2746        terminal_id: acp::TerminalId,
2747        cx: &mut Context<Self>,
2748    ) -> Result<()> {
2749        self.terminals
2750            .get(&terminal_id)
2751            .context("Terminal not found")?
2752            .update(cx, |terminal, cx| {
2753                terminal.kill(cx);
2754            });
2755
2756        Ok(())
2757    }
2758
2759    pub fn release_terminal(
2760        &mut self,
2761        terminal_id: acp::TerminalId,
2762        cx: &mut Context<Self>,
2763    ) -> Result<()> {
2764        self.terminals
2765            .remove(&terminal_id)
2766            .context("Terminal not found")?
2767            .update(cx, |terminal, cx| {
2768                terminal.kill(cx);
2769            });
2770
2771        Ok(())
2772    }
2773
2774    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2775        self.terminals
2776            .get(&terminal_id)
2777            .context("Terminal not found")
2778            .cloned()
2779    }
2780
2781    pub fn to_markdown(&self, cx: &App) -> String {
2782        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2783    }
2784
2785    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2786        cx.emit(AcpThreadEvent::LoadError(error));
2787    }
2788
2789    pub fn register_terminal_created(
2790        &mut self,
2791        terminal_id: acp::TerminalId,
2792        command_label: String,
2793        working_dir: Option<PathBuf>,
2794        output_byte_limit: Option<u64>,
2795        terminal: Entity<::terminal::Terminal>,
2796        cx: &mut Context<Self>,
2797    ) -> Entity<Terminal> {
2798        let language_registry = self.project.read(cx).languages().clone();
2799
2800        let entity = cx.new(|cx| {
2801            Terminal::new(
2802                terminal_id.clone(),
2803                &command_label,
2804                working_dir.clone(),
2805                output_byte_limit.map(|l| l as usize),
2806                terminal,
2807                language_registry,
2808                cx,
2809            )
2810        });
2811        self.terminals.insert(terminal_id.clone(), entity.clone());
2812        entity
2813    }
2814
2815    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2816        for entry in self.entries.iter_mut().rev() {
2817            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2818                assistant_message.is_subagent_output = true;
2819                cx.notify();
2820                return;
2821            }
2822        }
2823    }
2824
2825    pub fn on_terminal_provider_event(
2826        &mut self,
2827        event: TerminalProviderEvent,
2828        cx: &mut Context<Self>,
2829    ) {
2830        match event {
2831            TerminalProviderEvent::Created {
2832                terminal_id,
2833                label,
2834                cwd,
2835                output_byte_limit,
2836                terminal,
2837            } => {
2838                let entity = self.register_terminal_created(
2839                    terminal_id.clone(),
2840                    label,
2841                    cwd,
2842                    output_byte_limit,
2843                    terminal,
2844                    cx,
2845                );
2846
2847                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2848                    for data in chunks.drain(..) {
2849                        entity.update(cx, |term, cx| {
2850                            term.inner().update(cx, |inner, cx| {
2851                                inner.write_output(&data, cx);
2852                            })
2853                        });
2854                    }
2855                }
2856
2857                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2858                    entity.update(cx, |_term, cx| {
2859                        cx.notify();
2860                    });
2861                }
2862
2863                cx.notify();
2864            }
2865            TerminalProviderEvent::Output { terminal_id, data } => {
2866                if let Some(entity) = self.terminals.get(&terminal_id) {
2867                    entity.update(cx, |term, cx| {
2868                        term.inner().update(cx, |inner, cx| {
2869                            inner.write_output(&data, cx);
2870                        })
2871                    });
2872                } else {
2873                    self.pending_terminal_output
2874                        .entry(terminal_id)
2875                        .or_default()
2876                        .push(data);
2877                }
2878            }
2879            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2880                if let Some(entity) = self.terminals.get(&terminal_id) {
2881                    entity.update(cx, |term, cx| {
2882                        term.inner().update(cx, |inner, cx| {
2883                            inner.breadcrumb_text = title;
2884                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2885                        })
2886                    });
2887                }
2888            }
2889            TerminalProviderEvent::Exit {
2890                terminal_id,
2891                status,
2892            } => {
2893                if let Some(entity) = self.terminals.get(&terminal_id) {
2894                    entity.update(cx, |_term, cx| {
2895                        cx.notify();
2896                    });
2897                } else {
2898                    self.pending_terminal_exit.insert(terminal_id, status);
2899                }
2900            }
2901        }
2902    }
2903}
2904
2905fn markdown_for_raw_output(
2906    raw_output: &serde_json::Value,
2907    language_registry: &Arc<LanguageRegistry>,
2908    cx: &mut App,
2909) -> Option<Entity<Markdown>> {
2910    match raw_output {
2911        serde_json::Value::Null => None,
2912        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2913            Markdown::new(
2914                value.to_string().into(),
2915                Some(language_registry.clone()),
2916                None,
2917                cx,
2918            )
2919        })),
2920        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2921            Markdown::new(
2922                value.to_string().into(),
2923                Some(language_registry.clone()),
2924                None,
2925                cx,
2926            )
2927        })),
2928        serde_json::Value::String(value) => Some(cx.new(|cx| {
2929            Markdown::new(
2930                value.clone().into(),
2931                Some(language_registry.clone()),
2932                None,
2933                cx,
2934            )
2935        })),
2936        value => Some(cx.new(|cx| {
2937            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2938
2939            Markdown::new(
2940                format!("```json\n{}\n```", pretty_json).into(),
2941                Some(language_registry.clone()),
2942                None,
2943                cx,
2944            )
2945        })),
2946    }
2947}
2948
2949#[cfg(test)]
2950mod tests {
2951    use super::*;
2952    use anyhow::anyhow;
2953    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2954    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2955    use indoc::indoc;
2956    use project::{AgentId, FakeFs, Fs};
2957    use rand::{distr, prelude::*};
2958    use serde_json::json;
2959    use settings::SettingsStore;
2960    use smol::stream::StreamExt as _;
2961    use std::{
2962        any::Any,
2963        cell::RefCell,
2964        path::Path,
2965        rc::Rc,
2966        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2967        time::Duration,
2968    };
2969    use util::{path, path_list::PathList};
2970
2971    fn init_test(cx: &mut TestAppContext) {
2972        env_logger::try_init().ok();
2973        cx.update(|cx| {
2974            let settings_store = SettingsStore::test(cx);
2975            cx.set_global(settings_store);
2976        });
2977    }
2978
2979    #[gpui::test]
2980    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2981        init_test(cx);
2982
2983        let fs = FakeFs::new(cx.executor());
2984        let project = Project::test(fs, [], cx).await;
2985        let connection = Rc::new(FakeAgentConnection::new());
2986        let thread = cx
2987            .update(|cx| {
2988                connection.new_session(
2989                    project,
2990                    PathList::new(&[std::path::Path::new(path!("/test"))]),
2991                    cx,
2992                )
2993            })
2994            .await
2995            .unwrap();
2996
2997        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2998
2999        // Send Output BEFORE Created - should be buffered by acp_thread
3000        thread.update(cx, |thread, cx| {
3001            thread.on_terminal_provider_event(
3002                TerminalProviderEvent::Output {
3003                    terminal_id: terminal_id.clone(),
3004                    data: b"hello buffered".to_vec(),
3005                },
3006                cx,
3007            );
3008        });
3009
3010        // Create a display-only terminal and then send Created
3011        let lower = cx.new(|cx| {
3012            let builder = ::terminal::TerminalBuilder::new_display_only(
3013                ::terminal::terminal_settings::CursorShape::default(),
3014                ::terminal::terminal_settings::AlternateScroll::On,
3015                None,
3016                0,
3017                cx.background_executor(),
3018                PathStyle::local(),
3019            )
3020            .unwrap();
3021            builder.subscribe(cx)
3022        });
3023
3024        thread.update(cx, |thread, cx| {
3025            thread.on_terminal_provider_event(
3026                TerminalProviderEvent::Created {
3027                    terminal_id: terminal_id.clone(),
3028                    label: "Buffered Test".to_string(),
3029                    cwd: None,
3030                    output_byte_limit: None,
3031                    terminal: lower.clone(),
3032                },
3033                cx,
3034            );
3035        });
3036
3037        // After Created, buffered Output should have been flushed into the renderer
3038        let content = thread.read_with(cx, |thread, cx| {
3039            let term = thread.terminal(terminal_id.clone()).unwrap();
3040            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3041        });
3042
3043        assert!(
3044            content.contains("hello buffered"),
3045            "expected buffered output to render, got: {content}"
3046        );
3047    }
3048
3049    #[gpui::test]
3050    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3051        init_test(cx);
3052
3053        let fs = FakeFs::new(cx.executor());
3054        let project = Project::test(fs, [], cx).await;
3055        let connection = Rc::new(FakeAgentConnection::new());
3056        let thread = cx
3057            .update(|cx| {
3058                connection.new_session(
3059                    project,
3060                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3061                    cx,
3062                )
3063            })
3064            .await
3065            .unwrap();
3066
3067        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3068
3069        // Send Output BEFORE Created
3070        thread.update(cx, |thread, cx| {
3071            thread.on_terminal_provider_event(
3072                TerminalProviderEvent::Output {
3073                    terminal_id: terminal_id.clone(),
3074                    data: b"pre-exit data".to_vec(),
3075                },
3076                cx,
3077            );
3078        });
3079
3080        // Send Exit BEFORE Created
3081        thread.update(cx, |thread, cx| {
3082            thread.on_terminal_provider_event(
3083                TerminalProviderEvent::Exit {
3084                    terminal_id: terminal_id.clone(),
3085                    status: acp::TerminalExitStatus::new().exit_code(0),
3086                },
3087                cx,
3088            );
3089        });
3090
3091        // Now create a display-only lower-level terminal and send Created
3092        let lower = cx.new(|cx| {
3093            let builder = ::terminal::TerminalBuilder::new_display_only(
3094                ::terminal::terminal_settings::CursorShape::default(),
3095                ::terminal::terminal_settings::AlternateScroll::On,
3096                None,
3097                0,
3098                cx.background_executor(),
3099                PathStyle::local(),
3100            )
3101            .unwrap();
3102            builder.subscribe(cx)
3103        });
3104
3105        thread.update(cx, |thread, cx| {
3106            thread.on_terminal_provider_event(
3107                TerminalProviderEvent::Created {
3108                    terminal_id: terminal_id.clone(),
3109                    label: "Buffered Exit Test".to_string(),
3110                    cwd: None,
3111                    output_byte_limit: None,
3112                    terminal: lower.clone(),
3113                },
3114                cx,
3115            );
3116        });
3117
3118        // Output should be present after Created (flushed from buffer)
3119        let content = thread.read_with(cx, |thread, cx| {
3120            let term = thread.terminal(terminal_id.clone()).unwrap();
3121            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3122        });
3123
3124        assert!(
3125            content.contains("pre-exit data"),
3126            "expected pre-exit data to render, got: {content}"
3127        );
3128    }
3129
3130    /// Test that killing a terminal via Terminal::kill properly:
3131    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3132    /// 2. The underlying terminal still has the output that was written before the kill
3133    ///
3134    /// This test verifies that the fix to kill_active_task (which now also kills
3135    /// the shell process in addition to the foreground process) properly allows
3136    /// wait_for_exit to complete instead of hanging indefinitely.
3137    #[cfg(unix)]
3138    #[gpui::test]
3139    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3140        use std::collections::HashMap;
3141        use task::Shell;
3142        use util::shell_builder::ShellBuilder;
3143
3144        init_test(cx);
3145        cx.executor().allow_parking();
3146
3147        let fs = FakeFs::new(cx.executor());
3148        let project = Project::test(fs, [], cx).await;
3149        let connection = Rc::new(FakeAgentConnection::new());
3150        let thread = cx
3151            .update(|cx| {
3152                connection.new_session(
3153                    project.clone(),
3154                    PathList::new(&[Path::new(path!("/test"))]),
3155                    cx,
3156                )
3157            })
3158            .await
3159            .unwrap();
3160
3161        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3162
3163        // Create a real PTY terminal that runs a command which prints output then sleeps
3164        // We use printf instead of echo and chain with && sleep to ensure proper execution
3165        let (completion_tx, _completion_rx) = smol::channel::unbounded();
3166        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3167            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3168            &[],
3169        );
3170
3171        let builder = cx
3172            .update(|cx| {
3173                ::terminal::TerminalBuilder::new(
3174                    None,
3175                    None,
3176                    task::Shell::WithArguments {
3177                        program,
3178                        args,
3179                        title_override: None,
3180                    },
3181                    HashMap::default(),
3182                    ::terminal::terminal_settings::CursorShape::default(),
3183                    ::terminal::terminal_settings::AlternateScroll::On,
3184                    None,
3185                    vec![],
3186                    0,
3187                    false,
3188                    0,
3189                    Some(completion_tx),
3190                    cx,
3191                    vec![],
3192                    PathStyle::local(),
3193                )
3194            })
3195            .await
3196            .unwrap();
3197
3198        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3199
3200        // Create the acp_thread Terminal wrapper
3201        thread.update(cx, |thread, cx| {
3202            thread.on_terminal_provider_event(
3203                TerminalProviderEvent::Created {
3204                    terminal_id: terminal_id.clone(),
3205                    label: "printf output_before_kill && sleep 60".to_string(),
3206                    cwd: None,
3207                    output_byte_limit: None,
3208                    terminal: lower_terminal.clone(),
3209                },
3210                cx,
3211            );
3212        });
3213
3214        // Poll until the printf command produces output, rather than using a
3215        // fixed sleep which is flaky on loaded machines.
3216        let deadline = std::time::Instant::now() + Duration::from_secs(10);
3217        loop {
3218            let has_output = thread.read_with(cx, |thread, cx| {
3219                let term = thread
3220                    .terminals
3221                    .get(&terminal_id)
3222                    .expect("terminal not found");
3223                let content = term.read(cx).inner().read(cx).get_content();
3224                content.contains("output_before_kill")
3225            });
3226            if has_output {
3227                break;
3228            }
3229            assert!(
3230                std::time::Instant::now() < deadline,
3231                "Timed out waiting for printf output to appear in terminal",
3232            );
3233            cx.executor().timer(Duration::from_millis(50)).await;
3234        }
3235
3236        // Get the acp_thread Terminal and kill it
3237        let wait_for_exit = thread.update(cx, |thread, cx| {
3238            let term = thread.terminals.get(&terminal_id).unwrap();
3239            let wait_for_exit = term.read(cx).wait_for_exit();
3240            term.update(cx, |term, cx| {
3241                term.kill(cx);
3242            });
3243            wait_for_exit
3244        });
3245
3246        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3247        // Before the fix to kill_active_task, this would hang forever because
3248        // only the foreground process was killed, not the shell, so the PTY
3249        // child never exited and wait_for_completed_task never completed.
3250        let exit_result = futures::select! {
3251            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3252            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3253        };
3254
3255        assert!(
3256            exit_result.is_some(),
3257            "wait_for_exit should complete after kill, but it timed out. \
3258            This indicates kill_active_task is not properly killing the shell process."
3259        );
3260
3261        // Give the system a chance to process any pending updates
3262        cx.run_until_parked();
3263
3264        // Verify that the underlying terminal still has the output that was
3265        // written before the kill. This verifies that killing doesn't lose output.
3266        let inner_content = thread.read_with(cx, |thread, cx| {
3267            let term = thread.terminals.get(&terminal_id).unwrap();
3268            term.read(cx).inner().read(cx).get_content()
3269        });
3270
3271        assert!(
3272            inner_content.contains("output_before_kill"),
3273            "Underlying terminal should contain output from before kill, got: {}",
3274            inner_content
3275        );
3276    }
3277
3278    #[gpui::test]
3279    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3280        init_test(cx);
3281
3282        let fs = FakeFs::new(cx.executor());
3283        let project = Project::test(fs, [], cx).await;
3284        let connection = Rc::new(FakeAgentConnection::new());
3285        let thread = cx
3286            .update(|cx| {
3287                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3288            })
3289            .await
3290            .unwrap();
3291
3292        // Test creating a new user message
3293        thread.update(cx, |thread, cx| {
3294            thread.push_user_content_block(None, "Hello, ".into(), cx);
3295        });
3296
3297        thread.update(cx, |thread, cx| {
3298            assert_eq!(thread.entries.len(), 1);
3299            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3300                assert_eq!(user_msg.id, None);
3301                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3302            } else {
3303                panic!("Expected UserMessage");
3304            }
3305        });
3306
3307        // Test appending to existing user message
3308        let message_1_id = UserMessageId::new();
3309        thread.update(cx, |thread, cx| {
3310            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3311        });
3312
3313        thread.update(cx, |thread, cx| {
3314            assert_eq!(thread.entries.len(), 1);
3315            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3316                assert_eq!(user_msg.id, Some(message_1_id));
3317                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3318            } else {
3319                panic!("Expected UserMessage");
3320            }
3321        });
3322
3323        // Test creating new user message after assistant message
3324        thread.update(cx, |thread, cx| {
3325            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3326        });
3327
3328        let message_2_id = UserMessageId::new();
3329        thread.update(cx, |thread, cx| {
3330            thread.push_user_content_block(
3331                Some(message_2_id.clone()),
3332                "New user message".into(),
3333                cx,
3334            );
3335        });
3336
3337        thread.update(cx, |thread, cx| {
3338            assert_eq!(thread.entries.len(), 3);
3339            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3340                assert_eq!(user_msg.id, Some(message_2_id));
3341                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3342            } else {
3343                panic!("Expected UserMessage at index 2");
3344            }
3345        });
3346    }
3347
3348    #[gpui::test]
3349    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3350        init_test(cx);
3351
3352        let fs = FakeFs::new(cx.executor());
3353        let project = Project::test(fs, [], cx).await;
3354        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3355            |_, thread, mut cx| {
3356                async move {
3357                    thread.update(&mut cx, |thread, cx| {
3358                        thread
3359                            .handle_session_update(
3360                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3361                                    "Thinking ".into(),
3362                                )),
3363                                cx,
3364                            )
3365                            .unwrap();
3366                        thread
3367                            .handle_session_update(
3368                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3369                                    "hard!".into(),
3370                                )),
3371                                cx,
3372                            )
3373                            .unwrap();
3374                    })?;
3375                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3376                }
3377                .boxed_local()
3378            },
3379        ));
3380
3381        let thread = cx
3382            .update(|cx| {
3383                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3384            })
3385            .await
3386            .unwrap();
3387
3388        thread
3389            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3390            .await
3391            .unwrap();
3392
3393        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3394        assert_eq!(
3395            output,
3396            indoc! {r#"
3397            ## User
3398
3399            Hello from Zed!
3400
3401            ## Assistant
3402
3403            <thinking>
3404            Thinking hard!
3405            </thinking>
3406
3407            "#}
3408        );
3409    }
3410
3411    #[gpui::test]
3412    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3413        init_test(cx);
3414
3415        let fs = FakeFs::new(cx.executor());
3416        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3417            .await;
3418        let project = Project::test(fs.clone(), [], cx).await;
3419        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3420        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3421        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3422            move |_, thread, mut cx| {
3423                let read_file_tx = read_file_tx.clone();
3424                async move {
3425                    let content = thread
3426                        .update(&mut cx, |thread, cx| {
3427                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3428                        })
3429                        .unwrap()
3430                        .await
3431                        .unwrap();
3432                    assert_eq!(content, "one\ntwo\nthree\n");
3433                    read_file_tx.take().unwrap().send(()).unwrap();
3434                    thread
3435                        .update(&mut cx, |thread, cx| {
3436                            thread.write_text_file(
3437                                path!("/tmp/foo").into(),
3438                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3439                                cx,
3440                            )
3441                        })
3442                        .unwrap()
3443                        .await
3444                        .unwrap();
3445                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3446                }
3447                .boxed_local()
3448            },
3449        ));
3450
3451        let (worktree, pathbuf) = project
3452            .update(cx, |project, cx| {
3453                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3454            })
3455            .await
3456            .unwrap();
3457        let buffer = project
3458            .update(cx, |project, cx| {
3459                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3460            })
3461            .await
3462            .unwrap();
3463
3464        let thread = cx
3465            .update(|cx| {
3466                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3467            })
3468            .await
3469            .unwrap();
3470
3471        let request = thread.update(cx, |thread, cx| {
3472            thread.send_raw("Extend the count in /tmp/foo", cx)
3473        });
3474        read_file_rx.await.ok();
3475        buffer.update(cx, |buffer, cx| {
3476            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3477        });
3478        cx.run_until_parked();
3479        assert_eq!(
3480            buffer.read_with(cx, |buffer, _| buffer.text()),
3481            "zero\none\ntwo\nthree\nfour\nfive\n"
3482        );
3483        assert_eq!(
3484            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3485            "zero\none\ntwo\nthree\nfour\nfive\n"
3486        );
3487        request.await.unwrap();
3488    }
3489
3490    #[gpui::test]
3491    async fn test_reading_from_line(cx: &mut TestAppContext) {
3492        init_test(cx);
3493
3494        let fs = FakeFs::new(cx.executor());
3495        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3496            .await;
3497        let project = Project::test(fs.clone(), [], cx).await;
3498        project
3499            .update(cx, |project, cx| {
3500                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3501            })
3502            .await
3503            .unwrap();
3504
3505        let connection = Rc::new(FakeAgentConnection::new());
3506
3507        let thread = cx
3508            .update(|cx| {
3509                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3510            })
3511            .await
3512            .unwrap();
3513
3514        // Whole file
3515        let content = thread
3516            .update(cx, |thread, cx| {
3517                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3518            })
3519            .await
3520            .unwrap();
3521
3522        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3523
3524        // Only start line
3525        let content = thread
3526            .update(cx, |thread, cx| {
3527                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3528            })
3529            .await
3530            .unwrap();
3531
3532        assert_eq!(content, "three\nfour\n");
3533
3534        // Only limit
3535        let content = thread
3536            .update(cx, |thread, cx| {
3537                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3538            })
3539            .await
3540            .unwrap();
3541
3542        assert_eq!(content, "one\ntwo\n");
3543
3544        // Range
3545        let content = thread
3546            .update(cx, |thread, cx| {
3547                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3548            })
3549            .await
3550            .unwrap();
3551
3552        assert_eq!(content, "two\nthree\n");
3553
3554        // Invalid
3555        let err = thread
3556            .update(cx, |thread, cx| {
3557                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3558            })
3559            .await
3560            .unwrap_err();
3561
3562        assert_eq!(
3563            err.to_string(),
3564            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3565        );
3566    }
3567
3568    #[gpui::test]
3569    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3570        init_test(cx);
3571
3572        let fs = FakeFs::new(cx.executor());
3573        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3574        let project = Project::test(fs.clone(), [], cx).await;
3575        project
3576            .update(cx, |project, cx| {
3577                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3578            })
3579            .await
3580            .unwrap();
3581
3582        let connection = Rc::new(FakeAgentConnection::new());
3583
3584        let thread = cx
3585            .update(|cx| {
3586                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3587            })
3588            .await
3589            .unwrap();
3590
3591        // Whole file
3592        let content = thread
3593            .update(cx, |thread, cx| {
3594                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3595            })
3596            .await
3597            .unwrap();
3598
3599        assert_eq!(content, "");
3600
3601        // Only start line
3602        let content = thread
3603            .update(cx, |thread, cx| {
3604                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3605            })
3606            .await
3607            .unwrap();
3608
3609        assert_eq!(content, "");
3610
3611        // Only limit
3612        let content = thread
3613            .update(cx, |thread, cx| {
3614                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3615            })
3616            .await
3617            .unwrap();
3618
3619        assert_eq!(content, "");
3620
3621        // Range
3622        let content = thread
3623            .update(cx, |thread, cx| {
3624                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3625            })
3626            .await
3627            .unwrap();
3628
3629        assert_eq!(content, "");
3630
3631        // Invalid
3632        let err = thread
3633            .update(cx, |thread, cx| {
3634                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3635            })
3636            .await
3637            .unwrap_err();
3638
3639        assert_eq!(
3640            err.to_string(),
3641            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3642        );
3643    }
3644    #[gpui::test]
3645    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3646        init_test(cx);
3647
3648        let fs = FakeFs::new(cx.executor());
3649        fs.insert_tree(path!("/tmp"), json!({})).await;
3650        let project = Project::test(fs.clone(), [], cx).await;
3651        project
3652            .update(cx, |project, cx| {
3653                project.find_or_create_worktree(path!("/tmp"), true, cx)
3654            })
3655            .await
3656            .unwrap();
3657
3658        let connection = Rc::new(FakeAgentConnection::new());
3659
3660        let thread = cx
3661            .update(|cx| {
3662                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3663            })
3664            .await
3665            .unwrap();
3666
3667        // Out of project file
3668        let err = thread
3669            .update(cx, |thread, cx| {
3670                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3671            })
3672            .await
3673            .unwrap_err();
3674
3675        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3676    }
3677
3678    #[gpui::test]
3679    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3680        init_test(cx);
3681
3682        let fs = FakeFs::new(cx.executor());
3683        let project = Project::test(fs, [], cx).await;
3684        let id = acp::ToolCallId::new("test");
3685
3686        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3687            let id = id.clone();
3688            move |_, thread, mut cx| {
3689                let id = id.clone();
3690                async move {
3691                    thread
3692                        .update(&mut cx, |thread, cx| {
3693                            thread.handle_session_update(
3694                                acp::SessionUpdate::ToolCall(
3695                                    acp::ToolCall::new(id.clone(), "Label")
3696                                        .kind(acp::ToolKind::Fetch)
3697                                        .status(acp::ToolCallStatus::InProgress),
3698                                ),
3699                                cx,
3700                            )
3701                        })
3702                        .unwrap()
3703                        .unwrap();
3704                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3705                }
3706                .boxed_local()
3707            }
3708        }));
3709
3710        let thread = cx
3711            .update(|cx| {
3712                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3713            })
3714            .await
3715            .unwrap();
3716
3717        let request = thread.update(cx, |thread, cx| {
3718            thread.send_raw("Fetch https://example.com", cx)
3719        });
3720
3721        run_until_first_tool_call(&thread, cx).await;
3722
3723        thread.read_with(cx, |thread, _| {
3724            assert!(matches!(
3725                thread.entries[1],
3726                AgentThreadEntry::ToolCall(ToolCall {
3727                    status: ToolCallStatus::InProgress,
3728                    ..
3729                })
3730            ));
3731        });
3732
3733        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3734
3735        thread.read_with(cx, |thread, _| {
3736            assert!(matches!(
3737                &thread.entries[1],
3738                AgentThreadEntry::ToolCall(ToolCall {
3739                    status: ToolCallStatus::Canceled,
3740                    ..
3741                })
3742            ));
3743        });
3744
3745        thread
3746            .update(cx, |thread, cx| {
3747                thread.handle_session_update(
3748                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3749                        id,
3750                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3751                    )),
3752                    cx,
3753                )
3754            })
3755            .unwrap();
3756
3757        request.await.unwrap();
3758
3759        thread.read_with(cx, |thread, _| {
3760            assert!(matches!(
3761                thread.entries[1],
3762                AgentThreadEntry::ToolCall(ToolCall {
3763                    status: ToolCallStatus::Completed,
3764                    ..
3765                })
3766            ));
3767        });
3768    }
3769
3770    #[gpui::test]
3771    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3772        init_test(cx);
3773        let fs = FakeFs::new(cx.background_executor.clone());
3774        fs.insert_tree(path!("/test"), json!({})).await;
3775        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3776
3777        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3778            move |_, thread, mut cx| {
3779                async move {
3780                    thread
3781                        .update(&mut cx, |thread, cx| {
3782                            thread.handle_session_update(
3783                                acp::SessionUpdate::ToolCall(
3784                                    acp::ToolCall::new("test", "Label")
3785                                        .kind(acp::ToolKind::Edit)
3786                                        .status(acp::ToolCallStatus::Completed)
3787                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3788                                            "/test/test.txt",
3789                                            "foo",
3790                                        ))]),
3791                                ),
3792                                cx,
3793                            )
3794                        })
3795                        .unwrap()
3796                        .unwrap();
3797                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3798                }
3799                .boxed_local()
3800            }
3801        }));
3802
3803        let thread = cx
3804            .update(|cx| {
3805                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3806            })
3807            .await
3808            .unwrap();
3809
3810        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3811            .await
3812            .unwrap();
3813
3814        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3815    }
3816
3817    #[gpui::test(iterations = 10)]
3818    async fn test_checkpoints(cx: &mut TestAppContext) {
3819        init_test(cx);
3820        let fs = FakeFs::new(cx.background_executor.clone());
3821        fs.insert_tree(
3822            path!("/test"),
3823            json!({
3824                ".git": {}
3825            }),
3826        )
3827        .await;
3828        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3829
3830        let simulate_changes = Arc::new(AtomicBool::new(true));
3831        let next_filename = Arc::new(AtomicUsize::new(0));
3832        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3833            let simulate_changes = simulate_changes.clone();
3834            let next_filename = next_filename.clone();
3835            let fs = fs.clone();
3836            move |request, thread, mut cx| {
3837                let fs = fs.clone();
3838                let simulate_changes = simulate_changes.clone();
3839                let next_filename = next_filename.clone();
3840                async move {
3841                    if simulate_changes.load(SeqCst) {
3842                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3843                        fs.write(Path::new(&filename), b"").await?;
3844                    }
3845
3846                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3847                        panic!("expected text content block");
3848                    };
3849                    thread.update(&mut cx, |thread, cx| {
3850                        thread
3851                            .handle_session_update(
3852                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3853                                    content.text.to_uppercase().into(),
3854                                )),
3855                                cx,
3856                            )
3857                            .unwrap();
3858                    })?;
3859                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3860                }
3861                .boxed_local()
3862            }
3863        }));
3864        let thread = cx
3865            .update(|cx| {
3866                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3867            })
3868            .await
3869            .unwrap();
3870
3871        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3872            .await
3873            .unwrap();
3874        thread.read_with(cx, |thread, cx| {
3875            assert_eq!(
3876                thread.to_markdown(cx),
3877                indoc! {"
3878                    ## User (checkpoint)
3879
3880                    Lorem
3881
3882                    ## Assistant
3883
3884                    LOREM
3885
3886                "}
3887            );
3888        });
3889        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3890
3891        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3892            .await
3893            .unwrap();
3894        thread.read_with(cx, |thread, cx| {
3895            assert_eq!(
3896                thread.to_markdown(cx),
3897                indoc! {"
3898                    ## User (checkpoint)
3899
3900                    Lorem
3901
3902                    ## Assistant
3903
3904                    LOREM
3905
3906                    ## User (checkpoint)
3907
3908                    ipsum
3909
3910                    ## Assistant
3911
3912                    IPSUM
3913
3914                "}
3915            );
3916        });
3917        assert_eq!(
3918            fs.files(),
3919            vec![
3920                Path::new(path!("/test/file-0")),
3921                Path::new(path!("/test/file-1"))
3922            ]
3923        );
3924
3925        // Checkpoint isn't stored when there are no changes.
3926        simulate_changes.store(false, SeqCst);
3927        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3928            .await
3929            .unwrap();
3930        thread.read_with(cx, |thread, cx| {
3931            assert_eq!(
3932                thread.to_markdown(cx),
3933                indoc! {"
3934                    ## User (checkpoint)
3935
3936                    Lorem
3937
3938                    ## Assistant
3939
3940                    LOREM
3941
3942                    ## User (checkpoint)
3943
3944                    ipsum
3945
3946                    ## Assistant
3947
3948                    IPSUM
3949
3950                    ## User
3951
3952                    dolor
3953
3954                    ## Assistant
3955
3956                    DOLOR
3957
3958                "}
3959            );
3960        });
3961        assert_eq!(
3962            fs.files(),
3963            vec![
3964                Path::new(path!("/test/file-0")),
3965                Path::new(path!("/test/file-1"))
3966            ]
3967        );
3968
3969        // Rewinding the conversation truncates the history and restores the checkpoint.
3970        thread
3971            .update(cx, |thread, cx| {
3972                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3973                    panic!("unexpected entries {:?}", thread.entries)
3974                };
3975                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3976            })
3977            .await
3978            .unwrap();
3979        thread.read_with(cx, |thread, cx| {
3980            assert_eq!(
3981                thread.to_markdown(cx),
3982                indoc! {"
3983                    ## User (checkpoint)
3984
3985                    Lorem
3986
3987                    ## Assistant
3988
3989                    LOREM
3990
3991                "}
3992            );
3993        });
3994        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3995    }
3996
3997    #[gpui::test]
3998    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3999        use std::sync::atomic::AtomicUsize;
4000        init_test(cx);
4001
4002        let fs = FakeFs::new(cx.executor());
4003        let project = Project::test(fs, None, cx).await;
4004
4005        // Create a connection that simulates refusal after tool result
4006        let prompt_count = Arc::new(AtomicUsize::new(0));
4007        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4008            let prompt_count = prompt_count.clone();
4009            move |_request, thread, mut cx| {
4010                let count = prompt_count.fetch_add(1, SeqCst);
4011                async move {
4012                    if count == 0 {
4013                        // First prompt: Generate a tool call with result
4014                        thread.update(&mut cx, |thread, cx| {
4015                            thread
4016                                .handle_session_update(
4017                                    acp::SessionUpdate::ToolCall(
4018                                        acp::ToolCall::new("tool1", "Test Tool")
4019                                            .kind(acp::ToolKind::Fetch)
4020                                            .status(acp::ToolCallStatus::Completed)
4021                                            .raw_input(serde_json::json!({"query": "test"}))
4022                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
4023                                    ),
4024                                    cx,
4025                                )
4026                                .unwrap();
4027                        })?;
4028
4029                        // Now return refusal because of the tool result
4030                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4031                    } else {
4032                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4033                    }
4034                }
4035                .boxed_local()
4036            }
4037        }));
4038
4039        let thread = cx
4040            .update(|cx| {
4041                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4042            })
4043            .await
4044            .unwrap();
4045
4046        // Track if we see a Refusal event
4047        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4048        let saw_refusal_event_captured = saw_refusal_event.clone();
4049        thread.update(cx, |_thread, cx| {
4050            cx.subscribe(
4051                &thread,
4052                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4053                    if matches!(event, AcpThreadEvent::Refusal) {
4054                        *saw_refusal_event_captured.lock().unwrap() = true;
4055                    }
4056                },
4057            )
4058            .detach();
4059        });
4060
4061        // Send a user message - this will trigger tool call and then refusal
4062        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4063        cx.background_executor.spawn(send_task).detach();
4064        cx.run_until_parked();
4065
4066        // Verify that:
4067        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4068        // 2. The user message was NOT truncated
4069        assert!(
4070            *saw_refusal_event.lock().unwrap(),
4071            "Refusal event should be emitted for tool result refusals"
4072        );
4073
4074        thread.read_with(cx, |thread, _| {
4075            let entries = thread.entries();
4076            assert!(entries.len() >= 2, "Should have user message and tool call");
4077
4078            // Verify user message is still there
4079            assert!(
4080                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4081                "User message should not be truncated"
4082            );
4083
4084            // Verify tool call is there with result
4085            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4086                assert!(
4087                    tool_call.raw_output.is_some(),
4088                    "Tool call should have output"
4089                );
4090            } else {
4091                panic!("Expected tool call at index 1");
4092            }
4093        });
4094    }
4095
4096    #[gpui::test]
4097    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4098        init_test(cx);
4099
4100        let fs = FakeFs::new(cx.executor());
4101        let project = Project::test(fs, None, cx).await;
4102
4103        let refuse_next = Arc::new(AtomicBool::new(false));
4104        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4105            let refuse_next = refuse_next.clone();
4106            move |_request, _thread, _cx| {
4107                if refuse_next.load(SeqCst) {
4108                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4109                        .boxed_local()
4110                } else {
4111                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4112                        .boxed_local()
4113                }
4114            }
4115        }));
4116
4117        let thread = cx
4118            .update(|cx| {
4119                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4120            })
4121            .await
4122            .unwrap();
4123
4124        // Track if we see a Refusal event
4125        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4126        let saw_refusal_event_captured = saw_refusal_event.clone();
4127        thread.update(cx, |_thread, cx| {
4128            cx.subscribe(
4129                &thread,
4130                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4131                    if matches!(event, AcpThreadEvent::Refusal) {
4132                        *saw_refusal_event_captured.lock().unwrap() = true;
4133                    }
4134                },
4135            )
4136            .detach();
4137        });
4138
4139        // Send a message that will be refused
4140        refuse_next.store(true, SeqCst);
4141        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4142            .await
4143            .unwrap();
4144
4145        // Verify that a Refusal event WAS emitted for user prompt refusal
4146        assert!(
4147            *saw_refusal_event.lock().unwrap(),
4148            "Refusal event should be emitted for user prompt refusals"
4149        );
4150
4151        // Verify the message was truncated (user prompt refusal)
4152        thread.read_with(cx, |thread, cx| {
4153            assert_eq!(thread.to_markdown(cx), "");
4154        });
4155    }
4156
4157    #[gpui::test]
4158    async fn test_refusal(cx: &mut TestAppContext) {
4159        init_test(cx);
4160        let fs = FakeFs::new(cx.background_executor.clone());
4161        fs.insert_tree(path!("/"), json!({})).await;
4162        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4163
4164        let refuse_next = Arc::new(AtomicBool::new(false));
4165        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4166            let refuse_next = refuse_next.clone();
4167            move |request, thread, mut cx| {
4168                let refuse_next = refuse_next.clone();
4169                async move {
4170                    if refuse_next.load(SeqCst) {
4171                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4172                    }
4173
4174                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4175                        panic!("expected text content block");
4176                    };
4177                    thread.update(&mut cx, |thread, cx| {
4178                        thread
4179                            .handle_session_update(
4180                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4181                                    content.text.to_uppercase().into(),
4182                                )),
4183                                cx,
4184                            )
4185                            .unwrap();
4186                    })?;
4187                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4188                }
4189                .boxed_local()
4190            }
4191        }));
4192        let thread = cx
4193            .update(|cx| {
4194                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4195            })
4196            .await
4197            .unwrap();
4198
4199        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4200            .await
4201            .unwrap();
4202        thread.read_with(cx, |thread, cx| {
4203            assert_eq!(
4204                thread.to_markdown(cx),
4205                indoc! {"
4206                    ## User
4207
4208                    hello
4209
4210                    ## Assistant
4211
4212                    HELLO
4213
4214                "}
4215            );
4216        });
4217
4218        // Simulate refusing the second message. The message should be truncated
4219        // when a user prompt is refused.
4220        refuse_next.store(true, SeqCst);
4221        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4222            .await
4223            .unwrap();
4224        thread.read_with(cx, |thread, cx| {
4225            assert_eq!(
4226                thread.to_markdown(cx),
4227                indoc! {"
4228                    ## User
4229
4230                    hello
4231
4232                    ## Assistant
4233
4234                    HELLO
4235
4236                "}
4237            );
4238        });
4239    }
4240
4241    async fn run_until_first_tool_call(
4242        thread: &Entity<AcpThread>,
4243        cx: &mut TestAppContext,
4244    ) -> usize {
4245        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4246
4247        let subscription = cx.update(|cx| {
4248            cx.subscribe(thread, move |thread, _, cx| {
4249                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4250                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4251                        return tx.try_send(ix).unwrap();
4252                    }
4253                }
4254            })
4255        });
4256
4257        select! {
4258            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4259                panic!("Timeout waiting for tool call")
4260            }
4261            ix = rx.next().fuse() => {
4262                drop(subscription);
4263                ix.unwrap()
4264            }
4265        }
4266    }
4267
4268    #[derive(Clone, Default)]
4269    struct FakeAgentConnection {
4270        auth_methods: Vec<acp::AuthMethod>,
4271        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4272        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4273        on_user_message: Option<
4274            Rc<
4275                dyn Fn(
4276                        acp::PromptRequest,
4277                        WeakEntity<AcpThread>,
4278                        AsyncApp,
4279                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4280                    + 'static,
4281            >,
4282        >,
4283    }
4284
4285    impl FakeAgentConnection {
4286        fn new() -> Self {
4287            Self {
4288                auth_methods: Vec::new(),
4289                on_user_message: None,
4290                sessions: Arc::default(),
4291                set_title_calls: Default::default(),
4292            }
4293        }
4294
4295        #[expect(unused)]
4296        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4297            self.auth_methods = auth_methods;
4298            self
4299        }
4300
4301        fn on_user_message(
4302            mut self,
4303            handler: impl Fn(
4304                acp::PromptRequest,
4305                WeakEntity<AcpThread>,
4306                AsyncApp,
4307            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4308            + 'static,
4309        ) -> Self {
4310            self.on_user_message.replace(Rc::new(handler));
4311            self
4312        }
4313    }
4314
4315    impl AgentConnection for FakeAgentConnection {
4316        fn agent_id(&self) -> AgentId {
4317            AgentId::new("fake")
4318        }
4319
4320        fn telemetry_id(&self) -> SharedString {
4321            "fake".into()
4322        }
4323
4324        fn auth_methods(&self) -> &[acp::AuthMethod] {
4325            &self.auth_methods
4326        }
4327
4328        fn new_session(
4329            self: Rc<Self>,
4330            project: Entity<Project>,
4331            work_dirs: PathList,
4332            cx: &mut App,
4333        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4334            let session_id = acp::SessionId::new(
4335                rand::rng()
4336                    .sample_iter(&distr::Alphanumeric)
4337                    .take(7)
4338                    .map(char::from)
4339                    .collect::<String>(),
4340            );
4341            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4342            let thread = cx.new(|cx| {
4343                AcpThread::new(
4344                    None,
4345                    None,
4346                    Some(work_dirs),
4347                    self.clone(),
4348                    project,
4349                    action_log,
4350                    session_id.clone(),
4351                    watch::Receiver::constant(
4352                        acp::PromptCapabilities::new()
4353                            .image(true)
4354                            .audio(true)
4355                            .embedded_context(true),
4356                    ),
4357                    cx,
4358                )
4359            });
4360            self.sessions.lock().insert(session_id, thread.downgrade());
4361            Task::ready(Ok(thread))
4362        }
4363
4364        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4365            if self.auth_methods().iter().any(|m| m.id() == &method) {
4366                Task::ready(Ok(()))
4367            } else {
4368                Task::ready(Err(anyhow!("Invalid Auth Method")))
4369            }
4370        }
4371
4372        fn prompt(
4373            &self,
4374            _id: Option<UserMessageId>,
4375            params: acp::PromptRequest,
4376            cx: &mut App,
4377        ) -> Task<gpui::Result<acp::PromptResponse>> {
4378            let sessions = self.sessions.lock();
4379            let thread = sessions.get(&params.session_id).unwrap();
4380            if let Some(handler) = &self.on_user_message {
4381                let handler = handler.clone();
4382                let thread = thread.clone();
4383                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4384            } else {
4385                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4386            }
4387        }
4388
4389        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4390
4391        fn truncate(
4392            &self,
4393            session_id: &acp::SessionId,
4394            _cx: &App,
4395        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4396            Some(Rc::new(FakeAgentSessionEditor {
4397                _session_id: session_id.clone(),
4398            }))
4399        }
4400
4401        fn set_title(
4402            &self,
4403            _session_id: &acp::SessionId,
4404            _cx: &App,
4405        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4406            Some(Rc::new(FakeAgentSessionSetTitle {
4407                calls: self.set_title_calls.clone(),
4408            }))
4409        }
4410
4411        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4412            self
4413        }
4414    }
4415
4416    struct FakeAgentSessionSetTitle {
4417        calls: Rc<RefCell<Vec<SharedString>>>,
4418    }
4419
4420    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4421        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4422            self.calls.borrow_mut().push(title);
4423            Task::ready(Ok(()))
4424        }
4425    }
4426
4427    struct FakeAgentSessionEditor {
4428        _session_id: acp::SessionId,
4429    }
4430
4431    impl AgentSessionTruncate for FakeAgentSessionEditor {
4432        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4433            Task::ready(Ok(()))
4434        }
4435    }
4436
4437    #[gpui::test]
4438    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4439        init_test(cx);
4440
4441        let fs = FakeFs::new(cx.executor());
4442        let project = Project::test(fs, [], cx).await;
4443        let connection = Rc::new(FakeAgentConnection::new());
4444        let thread = cx
4445            .update(|cx| {
4446                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4447            })
4448            .await
4449            .unwrap();
4450
4451        // Try to update a tool call that doesn't exist
4452        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4453        thread.update(cx, |thread, cx| {
4454            let result = thread.handle_session_update(
4455                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4456                    nonexistent_id.clone(),
4457                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4458                )),
4459                cx,
4460            );
4461
4462            // The update should succeed (not return an error)
4463            assert!(result.is_ok());
4464
4465            // There should now be exactly one entry in the thread
4466            assert_eq!(thread.entries.len(), 1);
4467
4468            // The entry should be a failed tool call
4469            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4470                assert_eq!(tool_call.id, nonexistent_id);
4471                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4472                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4473
4474                // Check that the content contains the error message
4475                assert_eq!(tool_call.content.len(), 1);
4476                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4477                    match content_block {
4478                        ContentBlock::Markdown { markdown } => {
4479                            let markdown_text = markdown.read(cx).source();
4480                            assert!(markdown_text.contains("Tool call not found"));
4481                        }
4482                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4483                        ContentBlock::ResourceLink { .. } => {
4484                            panic!("Expected markdown content, got resource link")
4485                        }
4486                        ContentBlock::Image { .. } => {
4487                            panic!("Expected markdown content, got image")
4488                        }
4489                    }
4490                } else {
4491                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4492                }
4493            } else {
4494                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4495            }
4496        });
4497    }
4498
4499    /// Tests that restoring a checkpoint properly cleans up terminals that were
4500    /// created after that checkpoint, and cancels any in-progress generation.
4501    ///
4502    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4503    /// that were started after that checkpoint should be terminated, and any in-progress
4504    /// AI generation should be canceled.
4505    #[gpui::test]
4506    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4507        init_test(cx);
4508
4509        let fs = FakeFs::new(cx.executor());
4510        let project = Project::test(fs, [], cx).await;
4511        let connection = Rc::new(FakeAgentConnection::new());
4512        let thread = cx
4513            .update(|cx| {
4514                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4515            })
4516            .await
4517            .unwrap();
4518
4519        // Send first user message to create a checkpoint
4520        cx.update(|cx| {
4521            thread.update(cx, |thread, cx| {
4522                thread.send(vec!["first message".into()], cx)
4523            })
4524        })
4525        .await
4526        .unwrap();
4527
4528        // Send second message (creates another checkpoint) - we'll restore to this one
4529        cx.update(|cx| {
4530            thread.update(cx, |thread, cx| {
4531                thread.send(vec!["second message".into()], cx)
4532            })
4533        })
4534        .await
4535        .unwrap();
4536
4537        // Create 2 terminals BEFORE the checkpoint that have completed running
4538        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4539        let mock_terminal_1 = cx.new(|cx| {
4540            let builder = ::terminal::TerminalBuilder::new_display_only(
4541                ::terminal::terminal_settings::CursorShape::default(),
4542                ::terminal::terminal_settings::AlternateScroll::On,
4543                None,
4544                0,
4545                cx.background_executor(),
4546                PathStyle::local(),
4547            )
4548            .unwrap();
4549            builder.subscribe(cx)
4550        });
4551
4552        thread.update(cx, |thread, cx| {
4553            thread.on_terminal_provider_event(
4554                TerminalProviderEvent::Created {
4555                    terminal_id: terminal_id_1.clone(),
4556                    label: "echo 'first'".to_string(),
4557                    cwd: Some(PathBuf::from("/test")),
4558                    output_byte_limit: None,
4559                    terminal: mock_terminal_1.clone(),
4560                },
4561                cx,
4562            );
4563        });
4564
4565        thread.update(cx, |thread, cx| {
4566            thread.on_terminal_provider_event(
4567                TerminalProviderEvent::Output {
4568                    terminal_id: terminal_id_1.clone(),
4569                    data: b"first\n".to_vec(),
4570                },
4571                cx,
4572            );
4573        });
4574
4575        thread.update(cx, |thread, cx| {
4576            thread.on_terminal_provider_event(
4577                TerminalProviderEvent::Exit {
4578                    terminal_id: terminal_id_1.clone(),
4579                    status: acp::TerminalExitStatus::new().exit_code(0),
4580                },
4581                cx,
4582            );
4583        });
4584
4585        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4586        let mock_terminal_2 = cx.new(|cx| {
4587            let builder = ::terminal::TerminalBuilder::new_display_only(
4588                ::terminal::terminal_settings::CursorShape::default(),
4589                ::terminal::terminal_settings::AlternateScroll::On,
4590                None,
4591                0,
4592                cx.background_executor(),
4593                PathStyle::local(),
4594            )
4595            .unwrap();
4596            builder.subscribe(cx)
4597        });
4598
4599        thread.update(cx, |thread, cx| {
4600            thread.on_terminal_provider_event(
4601                TerminalProviderEvent::Created {
4602                    terminal_id: terminal_id_2.clone(),
4603                    label: "echo 'second'".to_string(),
4604                    cwd: Some(PathBuf::from("/test")),
4605                    output_byte_limit: None,
4606                    terminal: mock_terminal_2.clone(),
4607                },
4608                cx,
4609            );
4610        });
4611
4612        thread.update(cx, |thread, cx| {
4613            thread.on_terminal_provider_event(
4614                TerminalProviderEvent::Output {
4615                    terminal_id: terminal_id_2.clone(),
4616                    data: b"second\n".to_vec(),
4617                },
4618                cx,
4619            );
4620        });
4621
4622        thread.update(cx, |thread, cx| {
4623            thread.on_terminal_provider_event(
4624                TerminalProviderEvent::Exit {
4625                    terminal_id: terminal_id_2.clone(),
4626                    status: acp::TerminalExitStatus::new().exit_code(0),
4627                },
4628                cx,
4629            );
4630        });
4631
4632        // Get the second message ID to restore to
4633        let second_message_id = thread.read_with(cx, |thread, _| {
4634            // At this point we have:
4635            // - Index 0: First user message (with checkpoint)
4636            // - Index 1: Second user message (with checkpoint)
4637            // No assistant responses because FakeAgentConnection just returns EndTurn
4638            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4639                panic!("expected user message at index 1");
4640            };
4641            message.id.clone().unwrap()
4642        });
4643
4644        // Create a terminal AFTER the checkpoint we'll restore to.
4645        // This simulates the AI agent starting a long-running terminal command.
4646        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4647        let mock_terminal = cx.new(|cx| {
4648            let builder = ::terminal::TerminalBuilder::new_display_only(
4649                ::terminal::terminal_settings::CursorShape::default(),
4650                ::terminal::terminal_settings::AlternateScroll::On,
4651                None,
4652                0,
4653                cx.background_executor(),
4654                PathStyle::local(),
4655            )
4656            .unwrap();
4657            builder.subscribe(cx)
4658        });
4659
4660        // Register the terminal as created
4661        thread.update(cx, |thread, cx| {
4662            thread.on_terminal_provider_event(
4663                TerminalProviderEvent::Created {
4664                    terminal_id: terminal_id.clone(),
4665                    label: "sleep 1000".to_string(),
4666                    cwd: Some(PathBuf::from("/test")),
4667                    output_byte_limit: None,
4668                    terminal: mock_terminal.clone(),
4669                },
4670                cx,
4671            );
4672        });
4673
4674        // Simulate the terminal producing output (still running)
4675        thread.update(cx, |thread, cx| {
4676            thread.on_terminal_provider_event(
4677                TerminalProviderEvent::Output {
4678                    terminal_id: terminal_id.clone(),
4679                    data: b"terminal is running...\n".to_vec(),
4680                },
4681                cx,
4682            );
4683        });
4684
4685        // Create a tool call entry that references this terminal
4686        // This represents the agent requesting a terminal command
4687        thread.update(cx, |thread, cx| {
4688            thread
4689                .handle_session_update(
4690                    acp::SessionUpdate::ToolCall(
4691                        acp::ToolCall::new("terminal-tool-1", "Running command")
4692                            .kind(acp::ToolKind::Execute)
4693                            .status(acp::ToolCallStatus::InProgress)
4694                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4695                                terminal_id.clone(),
4696                            ))])
4697                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4698                    ),
4699                    cx,
4700                )
4701                .unwrap();
4702        });
4703
4704        // Verify terminal exists and is in the thread
4705        let terminal_exists_before =
4706            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4707        assert!(
4708            terminal_exists_before,
4709            "Terminal should exist before checkpoint restore"
4710        );
4711
4712        // Verify the terminal's underlying task is still running (not completed)
4713        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4714            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4715            terminal_entity.read_with(cx, |term, _cx| {
4716                term.output().is_none() // output is None means it's still running
4717            })
4718        });
4719        assert!(
4720            terminal_running_before,
4721            "Terminal should be running before checkpoint restore"
4722        );
4723
4724        // Verify we have the expected entries before restore
4725        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4726        assert!(
4727            entry_count_before > 1,
4728            "Should have multiple entries before restore"
4729        );
4730
4731        // Restore the checkpoint to the second message.
4732        // This should:
4733        // 1. Cancel any in-progress generation (via the cancel() call)
4734        // 2. Remove the terminal that was created after that point
4735        thread
4736            .update(cx, |thread, cx| {
4737                thread.restore_checkpoint(second_message_id, cx)
4738            })
4739            .await
4740            .unwrap();
4741
4742        // Verify that no send_task is in progress after restore
4743        // (cancel() clears the send_task)
4744        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4745        assert!(
4746            !has_send_task_after,
4747            "Should not have a send_task after restore (cancel should have cleared it)"
4748        );
4749
4750        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4751        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4752        assert_eq!(
4753            entry_count, 1,
4754            "Should have 1 entry after restore (only the first user message)"
4755        );
4756
4757        // Verify the 2 completed terminals from before the checkpoint still exist
4758        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4759            thread.terminals.contains_key(&terminal_id_1)
4760        });
4761        assert!(
4762            terminal_1_exists,
4763            "Terminal 1 (from before checkpoint) should still exist"
4764        );
4765
4766        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4767            thread.terminals.contains_key(&terminal_id_2)
4768        });
4769        assert!(
4770            terminal_2_exists,
4771            "Terminal 2 (from before checkpoint) should still exist"
4772        );
4773
4774        // Verify they're still in completed state
4775        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4776            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4777            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4778        });
4779        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4780
4781        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4782            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4783            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4784        });
4785        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4786
4787        // Verify the running terminal (created after checkpoint) was removed
4788        let terminal_3_exists =
4789            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4790        assert!(
4791            !terminal_3_exists,
4792            "Terminal 3 (created after checkpoint) should have been removed"
4793        );
4794
4795        // Verify total count is 2 (the two from before the checkpoint)
4796        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4797        assert_eq!(
4798            terminal_count, 2,
4799            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4800        );
4801    }
4802
4803    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4804    /// even when a new user message is added while the async checkpoint comparison is in progress.
4805    ///
4806    /// This is a regression test for a bug where update_last_checkpoint would fail with
4807    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4808    /// update_last_checkpoint started and when its async closure ran.
4809    #[gpui::test]
4810    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4811        init_test(cx);
4812
4813        let fs = FakeFs::new(cx.executor());
4814        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4815            .await;
4816        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4817
4818        let handler_done = Arc::new(AtomicBool::new(false));
4819        let handler_done_clone = handler_done.clone();
4820        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4821            move |_, _thread, _cx| {
4822                handler_done_clone.store(true, SeqCst);
4823                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4824            },
4825        ));
4826
4827        let thread = cx
4828            .update(|cx| {
4829                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4830            })
4831            .await
4832            .unwrap();
4833
4834        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4835        let send_task = cx.background_executor.spawn(send_future);
4836
4837        // Tick until handler completes, then a few more to let update_last_checkpoint start
4838        while !handler_done.load(SeqCst) {
4839            cx.executor().tick();
4840        }
4841        for _ in 0..5 {
4842            cx.executor().tick();
4843        }
4844
4845        thread.update(cx, |thread, cx| {
4846            thread.push_entry(
4847                AgentThreadEntry::UserMessage(UserMessage {
4848                    id: Some(UserMessageId::new()),
4849                    content: ContentBlock::Empty,
4850                    chunks: vec!["Injected message (no checkpoint)".into()],
4851                    checkpoint: None,
4852                    indented: false,
4853                }),
4854                cx,
4855            );
4856        });
4857
4858        cx.run_until_parked();
4859        let result = send_task.await;
4860
4861        assert!(
4862            result.is_ok(),
4863            "send should succeed even when new message added during update_last_checkpoint: {:?}",
4864            result.err()
4865        );
4866    }
4867
4868    /// Tests that when a follow-up message is sent during generation,
4869    /// the first turn completing does NOT clear `running_turn` because
4870    /// it now belongs to the second turn.
4871    #[gpui::test]
4872    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4873        init_test(cx);
4874
4875        let fs = FakeFs::new(cx.executor());
4876        let project = Project::test(fs, [], cx).await;
4877
4878        // First handler waits for this signal before completing
4879        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4880        let first_complete_rx = RefCell::new(Some(first_complete_rx));
4881
4882        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4883            move |params, _thread, _cx| {
4884                let first_complete_rx = first_complete_rx.borrow_mut().take();
4885                let is_first = params
4886                    .prompt
4887                    .iter()
4888                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4889
4890                async move {
4891                    if is_first {
4892                        // First handler waits until signaled
4893                        if let Some(rx) = first_complete_rx {
4894                            rx.await.ok();
4895                        }
4896                    }
4897                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4898                }
4899                .boxed_local()
4900            }
4901        }));
4902
4903        let thread = cx
4904            .update(|cx| {
4905                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4906            })
4907            .await
4908            .unwrap();
4909
4910        // Send first message (turn_id=1) - handler will block
4911        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4912        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4913
4914        // Send second message (turn_id=2) while first is still blocked
4915        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4916        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4917        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4918
4919        let running_turn_after_second_send =
4920            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4921        assert_eq!(
4922            running_turn_after_second_send,
4923            Some(2),
4924            "running_turn should be set to turn 2 after sending second message"
4925        );
4926
4927        // Now signal first handler to complete
4928        first_complete_tx.send(()).ok();
4929
4930        // First request completes - should NOT clear running_turn
4931        // because running_turn now belongs to turn 2
4932        first_request.await.unwrap();
4933
4934        let running_turn_after_first =
4935            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4936        assert_eq!(
4937            running_turn_after_first,
4938            Some(2),
4939            "first turn completing should not clear running_turn (belongs to turn 2)"
4940        );
4941
4942        // Second request completes - SHOULD clear running_turn
4943        second_request.await.unwrap();
4944
4945        let running_turn_after_second =
4946            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4947        assert!(
4948            !running_turn_after_second,
4949            "second turn completing should clear running_turn"
4950        );
4951    }
4952
4953    #[gpui::test]
4954    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4955        cx: &mut TestAppContext,
4956    ) {
4957        init_test(cx);
4958
4959        let fs = FakeFs::new(cx.executor());
4960        let project = Project::test(fs, [], cx).await;
4961
4962        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4963            move |_params, thread, mut cx| {
4964                async move {
4965                    thread
4966                        .update(&mut cx, |thread, cx| {
4967                            thread.handle_session_update(
4968                                acp::SessionUpdate::ToolCall(
4969                                    acp::ToolCall::new(
4970                                        acp::ToolCallId::new("test-tool"),
4971                                        "Test Tool",
4972                                    )
4973                                    .kind(acp::ToolKind::Fetch)
4974                                    .status(acp::ToolCallStatus::InProgress),
4975                                ),
4976                                cx,
4977                            )
4978                        })
4979                        .unwrap()
4980                        .unwrap();
4981
4982                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
4983                }
4984                .boxed_local()
4985            },
4986        ));
4987
4988        let thread = cx
4989            .update(|cx| {
4990                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4991            })
4992            .await
4993            .unwrap();
4994
4995        let response = thread
4996            .update(cx, |thread, cx| thread.send_raw("test message", cx))
4997            .await;
4998
4999        let response = response
5000            .expect("send should succeed")
5001            .expect("should have response");
5002        assert_eq!(
5003            response.stop_reason,
5004            acp::StopReason::Cancelled,
5005            "response should have Cancelled stop_reason"
5006        );
5007
5008        thread.read_with(cx, |thread, _| {
5009            let tool_entry = thread
5010                .entries
5011                .iter()
5012                .find_map(|e| {
5013                    if let AgentThreadEntry::ToolCall(call) = e {
5014                        Some(call)
5015                    } else {
5016                        None
5017                    }
5018                })
5019                .expect("should have tool call entry");
5020
5021            assert!(
5022                matches!(tool_entry.status, ToolCallStatus::Canceled),
5023                "tool should be marked as Canceled when response is Cancelled, got {:?}",
5024                tool_entry.status
5025            );
5026        });
5027    }
5028
5029    #[gpui::test]
5030    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5031        init_test(cx);
5032
5033        let fs = FakeFs::new(cx.executor());
5034        let project = Project::test(fs, [], cx).await;
5035        let connection = Rc::new(FakeAgentConnection::new());
5036        let set_title_calls = connection.set_title_calls.clone();
5037
5038        let thread = cx
5039            .update(|cx| {
5040                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5041            })
5042            .await
5043            .unwrap();
5044
5045        // Initial title is the default.
5046        thread.read_with(cx, |thread, _| {
5047            assert_eq!(thread.title(), None);
5048        });
5049
5050        // Setting a provisional title updates the display title.
5051        thread.update(cx, |thread, cx| {
5052            thread.set_provisional_title("Hello, can you help…".into(), cx);
5053        });
5054        thread.read_with(cx, |thread, _| {
5055            assert_eq!(
5056                thread.title().as_ref().map(|s| s.as_str()),
5057                Some("Hello, can you help…")
5058            );
5059        });
5060
5061        // The provisional title should NOT have propagated to the connection.
5062        assert_eq!(
5063            set_title_calls.borrow().len(),
5064            0,
5065            "provisional title should not propagate to the connection"
5066        );
5067
5068        // When the real title arrives via set_title, it replaces the
5069        // provisional title and propagates to the connection.
5070        let task = thread.update(cx, |thread, cx| {
5071            thread.set_title("Helping with Rust question".into(), cx)
5072        });
5073        task.await.expect("set_title should succeed");
5074        thread.read_with(cx, |thread, _| {
5075            assert_eq!(
5076                thread.title().as_ref().map(|s| s.as_str()),
5077                Some("Helping with Rust question")
5078            );
5079        });
5080        assert_eq!(
5081            set_title_calls.borrow().as_slice(),
5082            &[SharedString::from("Helping with Rust question")],
5083            "real title should propagate to the connection"
5084        );
5085    }
5086
5087    #[gpui::test]
5088    async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5089        cx: &mut TestAppContext,
5090    ) {
5091        init_test(cx);
5092
5093        let fs = FakeFs::new(cx.executor());
5094        let project = Project::test(fs, [], cx).await;
5095        let connection = Rc::new(FakeAgentConnection::new());
5096
5097        let thread = cx
5098            .update(|cx| {
5099                connection.clone().new_session(
5100                    project,
5101                    PathList::new(&[Path::new(path!("/test"))]),
5102                    cx,
5103                )
5104            })
5105            .await
5106            .unwrap();
5107
5108        let title_updated_events = Rc::new(RefCell::new(0usize));
5109        let title_updated_events_for_subscription = title_updated_events.clone();
5110        thread.update(cx, |_thread, cx| {
5111            cx.subscribe(
5112                &thread,
5113                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5114                    if matches!(event, AcpThreadEvent::TitleUpdated) {
5115                        *title_updated_events_for_subscription.borrow_mut() += 1;
5116                    }
5117                },
5118            )
5119            .detach();
5120        });
5121
5122        thread.update(cx, |thread, cx| {
5123            thread.set_provisional_title("Hello, can you help…".into(), cx);
5124        });
5125        assert_eq!(
5126            *title_updated_events.borrow(),
5127            1,
5128            "setting a provisional title should emit TitleUpdated"
5129        );
5130
5131        let result = thread.update(cx, |thread, cx| {
5132            thread.handle_session_update(
5133                acp::SessionUpdate::SessionInfoUpdate(
5134                    acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5135                ),
5136                cx,
5137            )
5138        });
5139        result.expect("session info update should succeed");
5140
5141        thread.read_with(cx, |thread, _| {
5142            assert_eq!(
5143                thread.title().as_ref().map(|s| s.as_str()),
5144                Some("Helping with Rust question")
5145            );
5146            assert!(
5147                !thread.has_provisional_title(),
5148                "session info title update should clear provisional title"
5149            );
5150        });
5151
5152        assert_eq!(
5153            *title_updated_events.borrow(),
5154            2,
5155            "session info title update should emit TitleUpdated"
5156        );
5157        assert!(
5158            connection.set_title_calls.borrow().is_empty(),
5159            "session info title update should not propagate back to the connection"
5160        );
5161    }
5162}