acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  13use itertools::Itertools;
  14use language::language_settings::FormatOnSave;
  15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  16use markdown::Markdown;
  17pub use mention::*;
  18use project::lsp_store::{FormatTrigger, LspFormatTarget};
  19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  20use serde::{Deserialize, Serialize};
  21use serde_json::to_string_pretty;
  22use std::collections::HashMap;
  23use std::error::Error;
  24use std::fmt::{Formatter, Write};
  25use std::ops::Range;
  26use std::process::ExitStatus;
  27use std::rc::Rc;
  28use std::time::{Duration, Instant};
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use task::{Shell, ShellBuilder};
  31pub use terminal::*;
  32use text::Bias;
  33use ui::App;
  34use util::markdown::MarkdownEscaped;
  35use util::path_list::PathList;
  36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  37use uuid::Uuid;
  38
  39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  41pub const TOOL_NAME_META_KEY: &str = "tool_name";
  42
  43/// Helper to extract tool name from ACP meta
  44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  45    meta.as_ref()
  46        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  47        .and_then(|v| v.as_str())
  48        .map(|s| SharedString::from(s.to_owned()))
  49}
  50
  51/// Helper to create meta with tool name
  52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  53    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  54}
  55
  56/// Key used in ACP ToolCall meta to store the session id and message indexes
  57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  58
  59#[derive(Clone, Debug, Deserialize, Serialize)]
  60pub struct SubagentSessionInfo {
  61    /// The session id of the subagent sessiont that was spawned
  62    pub session_id: acp::SessionId,
  63    /// The index of the message of the start of the "turn" run by this tool call
  64    pub message_start_index: usize,
  65    /// The index of the output of the message that the subagent has returned
  66    #[serde(skip_serializing_if = "Option::is_none")]
  67    pub message_end_index: Option<usize>,
  68}
  69
  70/// Helper to extract subagent session id from ACP meta
  71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  72    meta.as_ref()
  73        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  74        .and_then(|v| serde_json::from_value(v.clone()).ok())
  75}
  76
  77#[derive(Debug)]
  78pub struct UserMessage {
  79    pub id: Option<UserMessageId>,
  80    pub content: ContentBlock,
  81    pub chunks: Vec<acp::ContentBlock>,
  82    pub checkpoint: Option<Checkpoint>,
  83    pub indented: bool,
  84}
  85
  86#[derive(Debug)]
  87pub struct Checkpoint {
  88    git_checkpoint: GitStoreCheckpoint,
  89    pub show: bool,
  90}
  91
  92impl UserMessage {
  93    fn to_markdown(&self, cx: &App) -> String {
  94        let mut markdown = String::new();
  95        if self
  96            .checkpoint
  97            .as_ref()
  98            .is_some_and(|checkpoint| checkpoint.show)
  99        {
 100            writeln!(markdown, "## User (checkpoint)").unwrap();
 101        } else {
 102            writeln!(markdown, "## User").unwrap();
 103        }
 104        writeln!(markdown).unwrap();
 105        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 106        writeln!(markdown).unwrap();
 107        markdown
 108    }
 109}
 110
 111#[derive(Debug, PartialEq)]
 112pub struct AssistantMessage {
 113    pub chunks: Vec<AssistantMessageChunk>,
 114    pub indented: bool,
 115    pub is_subagent_output: bool,
 116}
 117
 118impl AssistantMessage {
 119    pub fn to_markdown(&self, cx: &App) -> String {
 120        format!(
 121            "## Assistant\n\n{}\n\n",
 122            self.chunks
 123                .iter()
 124                .map(|chunk| chunk.to_markdown(cx))
 125                .join("\n\n")
 126        )
 127    }
 128}
 129
 130#[derive(Debug, PartialEq)]
 131pub enum AssistantMessageChunk {
 132    Message { block: ContentBlock },
 133    Thought { block: ContentBlock },
 134}
 135
 136impl AssistantMessageChunk {
 137    pub fn from_str(
 138        chunk: &str,
 139        language_registry: &Arc<LanguageRegistry>,
 140        path_style: PathStyle,
 141        cx: &mut App,
 142    ) -> Self {
 143        Self::Message {
 144            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 145        }
 146    }
 147
 148    fn to_markdown(&self, cx: &App) -> String {
 149        match self {
 150            Self::Message { block } => block.to_markdown(cx).to_string(),
 151            Self::Thought { block } => {
 152                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 153            }
 154        }
 155    }
 156}
 157
 158#[derive(Debug)]
 159pub enum AgentThreadEntry {
 160    UserMessage(UserMessage),
 161    AssistantMessage(AssistantMessage),
 162    ToolCall(ToolCall),
 163    CompletedPlan(Vec<PlanEntry>),
 164}
 165
 166impl AgentThreadEntry {
 167    pub fn is_indented(&self) -> bool {
 168        match self {
 169            Self::UserMessage(message) => message.indented,
 170            Self::AssistantMessage(message) => message.indented,
 171            Self::ToolCall(_) => false,
 172            Self::CompletedPlan(_) => false,
 173        }
 174    }
 175
 176    pub fn to_markdown(&self, cx: &App) -> String {
 177        match self {
 178            Self::UserMessage(message) => message.to_markdown(cx),
 179            Self::AssistantMessage(message) => message.to_markdown(cx),
 180            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 181            Self::CompletedPlan(entries) => {
 182                let mut md = String::from("## Plan\n\n");
 183                for entry in entries {
 184                    let source = entry.content.read(cx).source().to_string();
 185                    md.push_str(&format!("- [x] {}\n", source));
 186                }
 187                md
 188            }
 189        }
 190    }
 191
 192    pub fn user_message(&self) -> Option<&UserMessage> {
 193        if let AgentThreadEntry::UserMessage(message) = self {
 194            Some(message)
 195        } else {
 196            None
 197        }
 198    }
 199
 200    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 201        if let AgentThreadEntry::ToolCall(call) = self {
 202            itertools::Either::Left(call.diffs())
 203        } else {
 204            itertools::Either::Right(std::iter::empty())
 205        }
 206    }
 207
 208    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 209        if let AgentThreadEntry::ToolCall(call) = self {
 210            itertools::Either::Left(call.terminals())
 211        } else {
 212            itertools::Either::Right(std::iter::empty())
 213        }
 214    }
 215
 216    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 217        if let AgentThreadEntry::ToolCall(ToolCall {
 218            locations,
 219            resolved_locations,
 220            ..
 221        }) = self
 222        {
 223            Some((
 224                locations.get(ix)?.clone(),
 225                resolved_locations.get(ix)?.clone()?,
 226            ))
 227        } else {
 228            None
 229        }
 230    }
 231}
 232
 233#[derive(Debug)]
 234pub struct ToolCall {
 235    pub id: acp::ToolCallId,
 236    pub label: Entity<Markdown>,
 237    pub kind: acp::ToolKind,
 238    pub content: Vec<ToolCallContent>,
 239    pub status: ToolCallStatus,
 240    pub locations: Vec<acp::ToolCallLocation>,
 241    pub resolved_locations: Vec<Option<AgentLocation>>,
 242    pub raw_input: Option<serde_json::Value>,
 243    pub raw_input_markdown: Option<Entity<Markdown>>,
 244    pub raw_output: Option<serde_json::Value>,
 245    pub tool_name: Option<SharedString>,
 246    pub subagent_session_info: Option<SubagentSessionInfo>,
 247}
 248
 249impl ToolCall {
 250    fn from_acp(
 251        tool_call: acp::ToolCall,
 252        status: ToolCallStatus,
 253        language_registry: Arc<LanguageRegistry>,
 254        path_style: PathStyle,
 255        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 256        cx: &mut App,
 257    ) -> Result<Self> {
 258        let title = if tool_call.kind == acp::ToolKind::Execute {
 259            tool_call.title
 260        } else if tool_call.kind == acp::ToolKind::Edit {
 261            MarkdownEscaped(tool_call.title.as_str()).to_string()
 262        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 263            first_line.to_owned() + ""
 264        } else {
 265            tool_call.title
 266        };
 267        let mut content = Vec::with_capacity(tool_call.content.len());
 268        for item in tool_call.content {
 269            if let Some(item) = ToolCallContent::from_acp(
 270                item,
 271                language_registry.clone(),
 272                path_style,
 273                terminals,
 274                cx,
 275            )? {
 276                content.push(item);
 277            }
 278        }
 279
 280        let raw_input_markdown = tool_call
 281            .raw_input
 282            .as_ref()
 283            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 284
 285        let tool_name = tool_name_from_meta(&tool_call.meta);
 286
 287        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 288
 289        let result = Self {
 290            id: tool_call.tool_call_id,
 291            label: cx
 292                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 293            kind: tool_call.kind,
 294            content,
 295            locations: tool_call.locations,
 296            resolved_locations: Vec::default(),
 297            status,
 298            raw_input: tool_call.raw_input,
 299            raw_input_markdown,
 300            raw_output: tool_call.raw_output,
 301            tool_name,
 302            subagent_session_info,
 303        };
 304        Ok(result)
 305    }
 306
 307    fn update_fields(
 308        &mut self,
 309        fields: acp::ToolCallUpdateFields,
 310        meta: Option<acp::Meta>,
 311        language_registry: Arc<LanguageRegistry>,
 312        path_style: PathStyle,
 313        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 314        cx: &mut App,
 315    ) -> Result<()> {
 316        let acp::ToolCallUpdateFields {
 317            kind,
 318            status,
 319            title,
 320            content,
 321            locations,
 322            raw_input,
 323            raw_output,
 324            ..
 325        } = fields;
 326
 327        if let Some(kind) = kind {
 328            self.kind = kind;
 329        }
 330
 331        if let Some(status) = status {
 332            self.status = status.into();
 333        }
 334
 335        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 336            self.subagent_session_info = Some(subagent_session_info);
 337        }
 338
 339        if let Some(title) = title {
 340            if self.kind == acp::ToolKind::Execute {
 341                for terminal in self.terminals() {
 342                    terminal.update(cx, |terminal, cx| {
 343                        terminal.update_command_label(&title, cx);
 344                    });
 345                }
 346            }
 347            self.label.update(cx, |label, cx| {
 348                if self.kind == acp::ToolKind::Execute {
 349                    label.replace(title, cx);
 350                } else if self.kind == acp::ToolKind::Edit {
 351                    label.replace(MarkdownEscaped(&title).to_string(), cx)
 352                } else if let Some((first_line, _)) = title.split_once("\n") {
 353                    label.replace(first_line.to_owned() + "", cx);
 354                } else {
 355                    label.replace(title, cx);
 356                }
 357            });
 358        }
 359
 360        if let Some(content) = content {
 361            let mut new_content_len = content.len();
 362            let mut content = content.into_iter();
 363
 364            // Reuse existing content if we can
 365            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 366                let valid_content =
 367                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 368                if !valid_content {
 369                    new_content_len -= 1;
 370                }
 371            }
 372            for new in content {
 373                if let Some(new) = ToolCallContent::from_acp(
 374                    new,
 375                    language_registry.clone(),
 376                    path_style,
 377                    terminals,
 378                    cx,
 379                )? {
 380                    self.content.push(new);
 381                } else {
 382                    new_content_len -= 1;
 383                }
 384            }
 385            self.content.truncate(new_content_len);
 386        }
 387
 388        if let Some(locations) = locations {
 389            self.locations = locations;
 390        }
 391
 392        if let Some(raw_input) = raw_input {
 393            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 394            self.raw_input = Some(raw_input);
 395        }
 396
 397        if let Some(raw_output) = raw_output {
 398            if self.content.is_empty()
 399                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 400            {
 401                self.content
 402                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 403                        markdown,
 404                    }));
 405            }
 406            self.raw_output = Some(raw_output);
 407        }
 408        Ok(())
 409    }
 410
 411    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 412        self.content.iter().filter_map(|content| match content {
 413            ToolCallContent::Diff(diff) => Some(diff),
 414            ToolCallContent::ContentBlock(_) => None,
 415            ToolCallContent::Terminal(_) => None,
 416        })
 417    }
 418
 419    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 420        self.content.iter().filter_map(|content| match content {
 421            ToolCallContent::Terminal(terminal) => Some(terminal),
 422            ToolCallContent::ContentBlock(_) => None,
 423            ToolCallContent::Diff(_) => None,
 424        })
 425    }
 426
 427    pub fn is_subagent(&self) -> bool {
 428        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 429            || self.subagent_session_info.is_some()
 430    }
 431
 432    pub fn to_markdown(&self, cx: &App) -> String {
 433        let mut markdown = format!(
 434            "**Tool Call: {}**\nStatus: {}\n\n",
 435            self.label.read(cx).source(),
 436            self.status
 437        );
 438        for content in &self.content {
 439            markdown.push_str(content.to_markdown(cx).as_str());
 440            markdown.push_str("\n\n");
 441        }
 442        markdown
 443    }
 444
 445    async fn resolve_location(
 446        location: acp::ToolCallLocation,
 447        project: WeakEntity<Project>,
 448        cx: &mut AsyncApp,
 449    ) -> Option<ResolvedLocation> {
 450        let buffer = project
 451            .update(cx, |project, cx| {
 452                project
 453                    .project_path_for_absolute_path(&location.path, cx)
 454                    .map(|path| project.open_buffer(path, cx))
 455            })
 456            .ok()??;
 457        let buffer = buffer.await.log_err()?;
 458        let position = buffer.update(cx, |buffer, _| {
 459            let snapshot = buffer.snapshot();
 460            if let Some(row) = location.line {
 461                let column = snapshot.indent_size_for_line(row).len;
 462                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 463                snapshot.anchor_before(point)
 464            } else {
 465                Anchor::min_for_buffer(snapshot.remote_id())
 466            }
 467        });
 468
 469        Some(ResolvedLocation { buffer, position })
 470    }
 471
 472    fn resolve_locations(
 473        &self,
 474        project: Entity<Project>,
 475        cx: &mut App,
 476    ) -> Task<Vec<Option<ResolvedLocation>>> {
 477        let locations = self.locations.clone();
 478        project.update(cx, |_, cx| {
 479            cx.spawn(async move |project, cx| {
 480                let mut new_locations = Vec::new();
 481                for location in locations {
 482                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 483                }
 484                new_locations
 485            })
 486        })
 487    }
 488}
 489
 490// Separate so we can hold a strong reference to the buffer
 491// for saving on the thread
 492#[derive(Clone, Debug, PartialEq, Eq)]
 493struct ResolvedLocation {
 494    buffer: Entity<Buffer>,
 495    position: Anchor,
 496}
 497
 498impl From<&ResolvedLocation> for AgentLocation {
 499    fn from(value: &ResolvedLocation) -> Self {
 500        Self {
 501            buffer: value.buffer.downgrade(),
 502            position: value.position,
 503        }
 504    }
 505}
 506
 507#[derive(Debug, Clone)]
 508pub enum SelectedPermissionParams {
 509    Terminal { patterns: Vec<String> },
 510}
 511
 512#[derive(Debug)]
 513pub struct SelectedPermissionOutcome {
 514    pub option_id: acp::PermissionOptionId,
 515    pub option_kind: acp::PermissionOptionKind,
 516    pub params: Option<SelectedPermissionParams>,
 517}
 518
 519impl SelectedPermissionOutcome {
 520    pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
 521        Self {
 522            option_id,
 523            option_kind,
 524            params: None,
 525        }
 526    }
 527
 528    pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
 529        self.params = params;
 530        self
 531    }
 532}
 533
 534impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
 535    fn from(value: SelectedPermissionOutcome) -> Self {
 536        Self::new(value.option_id)
 537    }
 538}
 539
 540#[derive(Debug)]
 541pub enum RequestPermissionOutcome {
 542    Cancelled,
 543    Selected(SelectedPermissionOutcome),
 544}
 545
 546impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
 547    fn from(value: RequestPermissionOutcome) -> Self {
 548        match value {
 549            RequestPermissionOutcome::Cancelled => Self::Cancelled,
 550            RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
 551        }
 552    }
 553}
 554
 555#[derive(Debug)]
 556pub enum ToolCallStatus {
 557    /// The tool call hasn't started running yet, but we start showing it to
 558    /// the user.
 559    Pending,
 560    /// The tool call is waiting for confirmation from the user.
 561    WaitingForConfirmation {
 562        options: PermissionOptions,
 563        respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
 564    },
 565    /// The tool call is currently running.
 566    InProgress,
 567    /// The tool call completed successfully.
 568    Completed,
 569    /// The tool call failed.
 570    Failed,
 571    /// The user rejected the tool call.
 572    Rejected,
 573    /// The user canceled generation so the tool call was canceled.
 574    Canceled,
 575}
 576
 577impl From<acp::ToolCallStatus> for ToolCallStatus {
 578    fn from(status: acp::ToolCallStatus) -> Self {
 579        match status {
 580            acp::ToolCallStatus::Pending => Self::Pending,
 581            acp::ToolCallStatus::InProgress => Self::InProgress,
 582            acp::ToolCallStatus::Completed => Self::Completed,
 583            acp::ToolCallStatus::Failed => Self::Failed,
 584            _ => Self::Pending,
 585        }
 586    }
 587}
 588
 589impl Display for ToolCallStatus {
 590    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 591        write!(
 592            f,
 593            "{}",
 594            match self {
 595                ToolCallStatus::Pending => "Pending",
 596                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 597                ToolCallStatus::InProgress => "In Progress",
 598                ToolCallStatus::Completed => "Completed",
 599                ToolCallStatus::Failed => "Failed",
 600                ToolCallStatus::Rejected => "Rejected",
 601                ToolCallStatus::Canceled => "Canceled",
 602            }
 603        )
 604    }
 605}
 606
 607#[derive(Debug, PartialEq, Clone)]
 608pub enum ContentBlock {
 609    Empty,
 610    Markdown { markdown: Entity<Markdown> },
 611    ResourceLink { resource_link: acp::ResourceLink },
 612    Image { image: Arc<gpui::Image> },
 613}
 614
 615impl ContentBlock {
 616    pub fn new(
 617        block: acp::ContentBlock,
 618        language_registry: &Arc<LanguageRegistry>,
 619        path_style: PathStyle,
 620        cx: &mut App,
 621    ) -> Self {
 622        let mut this = Self::Empty;
 623        this.append(block, language_registry, path_style, cx);
 624        this
 625    }
 626
 627    pub fn new_combined(
 628        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 629        language_registry: Arc<LanguageRegistry>,
 630        path_style: PathStyle,
 631        cx: &mut App,
 632    ) -> Self {
 633        let mut this = Self::Empty;
 634        for block in blocks {
 635            this.append(block, &language_registry, path_style, cx);
 636        }
 637        this
 638    }
 639
 640    pub fn append(
 641        &mut self,
 642        block: acp::ContentBlock,
 643        language_registry: &Arc<LanguageRegistry>,
 644        path_style: PathStyle,
 645        cx: &mut App,
 646    ) {
 647        match (&mut *self, &block) {
 648            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 649                *self = ContentBlock::ResourceLink {
 650                    resource_link: resource_link.clone(),
 651                };
 652            }
 653            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 654                if let Some(image) = Self::decode_image(image_content) {
 655                    *self = ContentBlock::Image { image };
 656                } else {
 657                    let new_content = Self::image_md(image_content);
 658                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 659                }
 660            }
 661            (ContentBlock::Empty, _) => {
 662                let new_content = Self::block_string_contents(&block, path_style);
 663                *self = Self::create_markdown_block(new_content, language_registry, cx);
 664            }
 665            (ContentBlock::Markdown { markdown }, _) => {
 666                let new_content = Self::block_string_contents(&block, path_style);
 667                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 668            }
 669            (ContentBlock::ResourceLink { resource_link }, _) => {
 670                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 671                let new_content = Self::block_string_contents(&block, path_style);
 672                let combined = format!("{}\n{}", existing_content, new_content);
 673                *self = Self::create_markdown_block(combined, language_registry, cx);
 674            }
 675            (ContentBlock::Image { .. }, _) => {
 676                let new_content = Self::block_string_contents(&block, path_style);
 677                let combined = format!("`Image`\n{}", new_content);
 678                *self = Self::create_markdown_block(combined, language_registry, cx);
 679            }
 680        }
 681    }
 682
 683    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 684        use base64::Engine as _;
 685
 686        let bytes = base64::engine::general_purpose::STANDARD
 687            .decode(image_content.data.as_bytes())
 688            .ok()?;
 689        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 690        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 691    }
 692
 693    fn create_markdown_block(
 694        content: String,
 695        language_registry: &Arc<LanguageRegistry>,
 696        cx: &mut App,
 697    ) -> ContentBlock {
 698        ContentBlock::Markdown {
 699            markdown: cx
 700                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 701        }
 702    }
 703
 704    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 705        match block {
 706            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 707            acp::ContentBlock::ResourceLink(resource_link) => {
 708                Self::resource_link_md(&resource_link.uri, path_style)
 709            }
 710            acp::ContentBlock::Resource(acp::EmbeddedResource {
 711                resource:
 712                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 713                        uri,
 714                        ..
 715                    }),
 716                ..
 717            }) => Self::resource_link_md(uri, path_style),
 718            acp::ContentBlock::Image(image) => Self::image_md(image),
 719            _ => String::new(),
 720        }
 721    }
 722
 723    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 724        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 725            uri.as_link().to_string()
 726        } else {
 727            uri.to_string()
 728        }
 729    }
 730
 731    fn image_md(_image: &acp::ImageContent) -> String {
 732        "`Image`".into()
 733    }
 734
 735    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 736        match self {
 737            ContentBlock::Empty => "",
 738            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 739            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 740            ContentBlock::Image { .. } => "`Image`",
 741        }
 742    }
 743
 744    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 745        match self {
 746            ContentBlock::Empty => None,
 747            ContentBlock::Markdown { markdown } => Some(markdown),
 748            ContentBlock::ResourceLink { .. } => None,
 749            ContentBlock::Image { .. } => None,
 750        }
 751    }
 752
 753    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 754        match self {
 755            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 756            _ => None,
 757        }
 758    }
 759
 760    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 761        match self {
 762            ContentBlock::Image { image } => Some(image),
 763            _ => None,
 764        }
 765    }
 766}
 767
 768#[derive(Debug)]
 769pub enum ToolCallContent {
 770    ContentBlock(ContentBlock),
 771    Diff(Entity<Diff>),
 772    Terminal(Entity<Terminal>),
 773}
 774
 775impl ToolCallContent {
 776    pub fn from_acp(
 777        content: acp::ToolCallContent,
 778        language_registry: Arc<LanguageRegistry>,
 779        path_style: PathStyle,
 780        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 781        cx: &mut App,
 782    ) -> Result<Option<Self>> {
 783        match content {
 784            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 785                Ok(Some(Self::ContentBlock(ContentBlock::new(
 786                    content,
 787                    &language_registry,
 788                    path_style,
 789                    cx,
 790                ))))
 791            }
 792            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 793                Diff::finalized(
 794                    diff.path.to_string_lossy().into_owned(),
 795                    diff.old_text,
 796                    diff.new_text,
 797                    language_registry,
 798                    cx,
 799                )
 800            })))),
 801            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 802                .get(&terminal_id)
 803                .cloned()
 804                .map(|terminal| Some(Self::Terminal(terminal)))
 805                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 806            _ => Ok(None),
 807        }
 808    }
 809
 810    pub fn update_from_acp(
 811        &mut self,
 812        new: acp::ToolCallContent,
 813        language_registry: Arc<LanguageRegistry>,
 814        path_style: PathStyle,
 815        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 816        cx: &mut App,
 817    ) -> Result<bool> {
 818        let needs_update = match (&self, &new) {
 819            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 820                old_diff.read(cx).needs_update(
 821                    new_diff.old_text.as_deref().unwrap_or(""),
 822                    &new_diff.new_text,
 823                    cx,
 824                )
 825            }
 826            _ => true,
 827        };
 828
 829        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 830            if needs_update {
 831                *self = update;
 832            }
 833            Ok(true)
 834        } else {
 835            Ok(false)
 836        }
 837    }
 838
 839    pub fn to_markdown(&self, cx: &App) -> String {
 840        match self {
 841            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 842            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 843            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 844        }
 845    }
 846
 847    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 848        match self {
 849            Self::ContentBlock(content) => content.image(),
 850            _ => None,
 851        }
 852    }
 853}
 854
 855#[derive(Debug, PartialEq)]
 856pub enum ToolCallUpdate {
 857    UpdateFields(acp::ToolCallUpdate),
 858    UpdateDiff(ToolCallUpdateDiff),
 859    UpdateTerminal(ToolCallUpdateTerminal),
 860}
 861
 862impl ToolCallUpdate {
 863    fn id(&self) -> &acp::ToolCallId {
 864        match self {
 865            Self::UpdateFields(update) => &update.tool_call_id,
 866            Self::UpdateDiff(diff) => &diff.id,
 867            Self::UpdateTerminal(terminal) => &terminal.id,
 868        }
 869    }
 870}
 871
 872impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 873    fn from(update: acp::ToolCallUpdate) -> Self {
 874        Self::UpdateFields(update)
 875    }
 876}
 877
 878impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 879    fn from(diff: ToolCallUpdateDiff) -> Self {
 880        Self::UpdateDiff(diff)
 881    }
 882}
 883
 884#[derive(Debug, PartialEq)]
 885pub struct ToolCallUpdateDiff {
 886    pub id: acp::ToolCallId,
 887    pub diff: Entity<Diff>,
 888}
 889
 890impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 891    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 892        Self::UpdateTerminal(terminal)
 893    }
 894}
 895
 896#[derive(Debug, PartialEq)]
 897pub struct ToolCallUpdateTerminal {
 898    pub id: acp::ToolCallId,
 899    pub terminal: Entity<Terminal>,
 900}
 901
 902#[derive(Debug, Default)]
 903pub struct Plan {
 904    pub entries: Vec<PlanEntry>,
 905}
 906
 907#[derive(Debug)]
 908pub struct PlanStats<'a> {
 909    pub in_progress_entry: Option<&'a PlanEntry>,
 910    pub pending: u32,
 911    pub completed: u32,
 912}
 913
 914impl Plan {
 915    pub fn is_empty(&self) -> bool {
 916        self.entries.is_empty()
 917    }
 918
 919    pub fn stats(&self) -> PlanStats<'_> {
 920        let mut stats = PlanStats {
 921            in_progress_entry: None,
 922            pending: 0,
 923            completed: 0,
 924        };
 925
 926        for entry in &self.entries {
 927            match &entry.status {
 928                acp::PlanEntryStatus::Pending => {
 929                    stats.pending += 1;
 930                }
 931                acp::PlanEntryStatus::InProgress => {
 932                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 933                    stats.pending += 1;
 934                }
 935                acp::PlanEntryStatus::Completed => {
 936                    stats.completed += 1;
 937                }
 938                _ => {}
 939            }
 940        }
 941
 942        stats
 943    }
 944}
 945
 946#[derive(Debug)]
 947pub struct PlanEntry {
 948    pub content: Entity<Markdown>,
 949    pub priority: acp::PlanEntryPriority,
 950    pub status: acp::PlanEntryStatus,
 951}
 952
 953impl PlanEntry {
 954    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 955        Self {
 956            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 957            priority: entry.priority,
 958            status: entry.status,
 959        }
 960    }
 961}
 962
 963#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 964pub struct TokenUsage {
 965    pub max_tokens: u64,
 966    pub used_tokens: u64,
 967    pub input_tokens: u64,
 968    pub output_tokens: u64,
 969    pub max_output_tokens: Option<u64>,
 970}
 971
 972pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 973
 974impl TokenUsage {
 975    pub fn ratio(&self) -> TokenUsageRatio {
 976        #[cfg(debug_assertions)]
 977        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 978            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 979            .parse()
 980            .unwrap();
 981        #[cfg(not(debug_assertions))]
 982        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
 983
 984        // When the maximum is unknown because there is no selected model,
 985        // avoid showing the token limit warning.
 986        if self.max_tokens == 0 {
 987            TokenUsageRatio::Normal
 988        } else if self.used_tokens >= self.max_tokens {
 989            TokenUsageRatio::Exceeded
 990        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 991            TokenUsageRatio::Warning
 992        } else {
 993            TokenUsageRatio::Normal
 994        }
 995    }
 996}
 997
 998#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
 999pub enum TokenUsageRatio {
1000    Normal,
1001    Warning,
1002    Exceeded,
1003}
1004
1005#[derive(Debug, Clone)]
1006pub struct RetryStatus {
1007    pub last_error: SharedString,
1008    pub attempt: usize,
1009    pub max_attempts: usize,
1010    pub started_at: Instant,
1011    pub duration: Duration,
1012}
1013
1014struct RunningTurn {
1015    id: u32,
1016    send_task: Task<()>,
1017}
1018
1019pub struct AcpThread {
1020    session_id: acp::SessionId,
1021    work_dirs: Option<PathList>,
1022    parent_session_id: Option<acp::SessionId>,
1023    title: Option<SharedString>,
1024    provisional_title: Option<SharedString>,
1025    entries: Vec<AgentThreadEntry>,
1026    plan: Plan,
1027    project: Entity<Project>,
1028    action_log: Entity<ActionLog>,
1029    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1030    turn_id: u32,
1031    running_turn: Option<RunningTurn>,
1032    connection: Rc<dyn AgentConnection>,
1033    token_usage: Option<TokenUsage>,
1034    prompt_capabilities: acp::PromptCapabilities,
1035    available_commands: Vec<acp::AvailableCommand>,
1036    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1037    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1038    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1039    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1040    had_error: bool,
1041    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1042    draft_prompt: Option<Vec<acp::ContentBlock>>,
1043    /// The initial scroll position for the thread view, set during session registration.
1044    ui_scroll_position: Option<gpui::ListOffset>,
1045    /// Buffer for smooth text streaming. Holds text that has been received from
1046    /// the model but not yet revealed in the UI. A timer task drains this buffer
1047    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1048    /// updates.
1049    streaming_text_buffer: Option<StreamingTextBuffer>,
1050}
1051
1052struct StreamingTextBuffer {
1053    /// Text received from the model but not yet appended to the Markdown source.
1054    pending: String,
1055    /// The number of bytes to reveal per timer turn.
1056    bytes_to_reveal_per_tick: usize,
1057    /// The Markdown entity being streamed into.
1058    target: Entity<Markdown>,
1059    /// Timer task that periodically moves text from `pending` into `source`.
1060    _reveal_task: Task<()>,
1061}
1062
1063impl StreamingTextBuffer {
1064    /// The number of milliseconds between each timer tick, controlling how quickly
1065    /// text is revealed.
1066    const TASK_UPDATE_MS: u64 = 16;
1067    /// The time in milliseconds to reveal the entire pending text.
1068    const REVEAL_TARGET: f32 = 200.0;
1069}
1070
1071impl From<&AcpThread> for ActionLogTelemetry {
1072    fn from(value: &AcpThread) -> Self {
1073        Self {
1074            agent_telemetry_id: value.connection().telemetry_id(),
1075            session_id: value.session_id.0.clone(),
1076        }
1077    }
1078}
1079
1080#[derive(Debug)]
1081pub enum AcpThreadEvent {
1082    NewEntry,
1083    TitleUpdated,
1084    TokenUsageUpdated,
1085    EntryUpdated(usize),
1086    EntriesRemoved(Range<usize>),
1087    ToolAuthorizationRequested(acp::ToolCallId),
1088    ToolAuthorizationReceived(acp::ToolCallId),
1089    Retry(RetryStatus),
1090    SubagentSpawned(acp::SessionId),
1091    Stopped(acp::StopReason),
1092    Error,
1093    LoadError(LoadError),
1094    PromptCapabilitiesUpdated,
1095    Refusal,
1096    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1097    ModeUpdated(acp::SessionModeId),
1098    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1099    WorkingDirectoriesUpdated,
1100}
1101
1102impl EventEmitter<AcpThreadEvent> for AcpThread {}
1103
1104#[derive(Debug, Clone)]
1105pub enum TerminalProviderEvent {
1106    Created {
1107        terminal_id: acp::TerminalId,
1108        label: String,
1109        cwd: Option<PathBuf>,
1110        output_byte_limit: Option<u64>,
1111        terminal: Entity<::terminal::Terminal>,
1112    },
1113    Output {
1114        terminal_id: acp::TerminalId,
1115        data: Vec<u8>,
1116    },
1117    TitleChanged {
1118        terminal_id: acp::TerminalId,
1119        title: String,
1120    },
1121    Exit {
1122        terminal_id: acp::TerminalId,
1123        status: acp::TerminalExitStatus,
1124    },
1125}
1126
1127#[derive(Debug, Clone)]
1128pub enum TerminalProviderCommand {
1129    WriteInput {
1130        terminal_id: acp::TerminalId,
1131        bytes: Vec<u8>,
1132    },
1133    Resize {
1134        terminal_id: acp::TerminalId,
1135        cols: u16,
1136        rows: u16,
1137    },
1138    Close {
1139        terminal_id: acp::TerminalId,
1140    },
1141}
1142
1143#[derive(PartialEq, Eq, Debug)]
1144pub enum ThreadStatus {
1145    Idle,
1146    Generating,
1147}
1148
1149#[derive(Debug, Clone)]
1150pub enum LoadError {
1151    Unsupported {
1152        command: SharedString,
1153        current_version: SharedString,
1154        minimum_version: SharedString,
1155    },
1156    FailedToInstall(SharedString),
1157    Exited {
1158        status: ExitStatus,
1159    },
1160    Other(SharedString),
1161}
1162
1163impl Display for LoadError {
1164    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1165        match self {
1166            LoadError::Unsupported {
1167                command: path,
1168                current_version,
1169                minimum_version,
1170            } => {
1171                write!(
1172                    f,
1173                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1174                )
1175            }
1176            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1177            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1178            LoadError::Other(msg) => write!(f, "{msg}"),
1179        }
1180    }
1181}
1182
1183impl Error for LoadError {}
1184
1185impl AcpThread {
1186    pub fn new(
1187        parent_session_id: Option<acp::SessionId>,
1188        title: Option<SharedString>,
1189        work_dirs: Option<PathList>,
1190        connection: Rc<dyn AgentConnection>,
1191        project: Entity<Project>,
1192        action_log: Entity<ActionLog>,
1193        session_id: acp::SessionId,
1194        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1195        cx: &mut Context<Self>,
1196    ) -> Self {
1197        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1198        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1199            loop {
1200                let caps = prompt_capabilities_rx.recv().await?;
1201                this.update(cx, |this, cx| {
1202                    this.prompt_capabilities = caps;
1203                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1204                })?;
1205            }
1206        });
1207
1208        Self {
1209            parent_session_id,
1210            work_dirs,
1211            action_log,
1212            shared_buffers: Default::default(),
1213            entries: Default::default(),
1214            plan: Default::default(),
1215            title,
1216            provisional_title: None,
1217            project,
1218            running_turn: None,
1219            turn_id: 0,
1220            connection,
1221            session_id,
1222            token_usage: None,
1223            prompt_capabilities,
1224            available_commands: Vec::new(),
1225            _observe_prompt_capabilities: task,
1226            terminals: HashMap::default(),
1227            pending_terminal_output: HashMap::default(),
1228            pending_terminal_exit: HashMap::default(),
1229            had_error: false,
1230            draft_prompt: None,
1231            ui_scroll_position: None,
1232            streaming_text_buffer: None,
1233        }
1234    }
1235
1236    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1237        self.parent_session_id.as_ref()
1238    }
1239
1240    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1241        self.prompt_capabilities.clone()
1242    }
1243
1244    pub fn available_commands(&self) -> &[acp::AvailableCommand] {
1245        &self.available_commands
1246    }
1247
1248    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1249        self.draft_prompt.as_deref()
1250    }
1251
1252    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1253        self.draft_prompt = prompt;
1254    }
1255
1256    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1257        self.ui_scroll_position
1258    }
1259
1260    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1261        self.ui_scroll_position = position;
1262    }
1263
1264    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1265        &self.connection
1266    }
1267
1268    pub fn action_log(&self) -> &Entity<ActionLog> {
1269        &self.action_log
1270    }
1271
1272    pub fn project(&self) -> &Entity<Project> {
1273        &self.project
1274    }
1275
1276    pub fn title(&self) -> Option<SharedString> {
1277        self.title
1278            .clone()
1279            .or_else(|| self.provisional_title.clone())
1280    }
1281
1282    pub fn has_provisional_title(&self) -> bool {
1283        self.provisional_title.is_some()
1284    }
1285
1286    pub fn entries(&self) -> &[AgentThreadEntry] {
1287        &self.entries
1288    }
1289
1290    pub fn session_id(&self) -> &acp::SessionId {
1291        &self.session_id
1292    }
1293
1294    pub fn work_dirs(&self) -> Option<&PathList> {
1295        self.work_dirs.as_ref()
1296    }
1297
1298    pub fn set_work_dirs(&mut self, work_dirs: PathList, cx: &mut Context<Self>) {
1299        self.work_dirs = Some(work_dirs);
1300        cx.emit(AcpThreadEvent::WorkingDirectoriesUpdated)
1301    }
1302
1303    pub fn status(&self) -> ThreadStatus {
1304        if self.running_turn.is_some() {
1305            ThreadStatus::Generating
1306        } else {
1307            ThreadStatus::Idle
1308        }
1309    }
1310
1311    pub fn had_error(&self) -> bool {
1312        self.had_error
1313    }
1314
1315    pub fn is_waiting_for_confirmation(&self) -> bool {
1316        for entry in self.entries.iter().rev() {
1317            match entry {
1318                AgentThreadEntry::UserMessage(_) => return false,
1319                AgentThreadEntry::ToolCall(ToolCall {
1320                    status: ToolCallStatus::WaitingForConfirmation { .. },
1321                    ..
1322                }) => return true,
1323                AgentThreadEntry::ToolCall(_)
1324                | AgentThreadEntry::AssistantMessage(_)
1325                | AgentThreadEntry::CompletedPlan(_) => {}
1326            }
1327        }
1328        false
1329    }
1330
1331    pub fn token_usage(&self) -> Option<&TokenUsage> {
1332        self.token_usage.as_ref()
1333    }
1334
1335    pub fn has_pending_edit_tool_calls(&self) -> bool {
1336        for entry in self.entries.iter().rev() {
1337            match entry {
1338                AgentThreadEntry::UserMessage(_) => return false,
1339                AgentThreadEntry::ToolCall(
1340                    call @ ToolCall {
1341                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1342                        ..
1343                    },
1344                ) if call.diffs().next().is_some() => {
1345                    return true;
1346                }
1347                AgentThreadEntry::ToolCall(_)
1348                | AgentThreadEntry::AssistantMessage(_)
1349                | AgentThreadEntry::CompletedPlan(_) => {}
1350            }
1351        }
1352
1353        false
1354    }
1355
1356    pub fn has_in_progress_tool_calls(&self) -> bool {
1357        for entry in self.entries.iter().rev() {
1358            match entry {
1359                AgentThreadEntry::UserMessage(_) => return false,
1360                AgentThreadEntry::ToolCall(ToolCall {
1361                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1362                    ..
1363                }) => {
1364                    return true;
1365                }
1366                AgentThreadEntry::ToolCall(_)
1367                | AgentThreadEntry::AssistantMessage(_)
1368                | AgentThreadEntry::CompletedPlan(_) => {}
1369            }
1370        }
1371
1372        false
1373    }
1374
1375    pub fn used_tools_since_last_user_message(&self) -> bool {
1376        for entry in self.entries.iter().rev() {
1377            match entry {
1378                AgentThreadEntry::UserMessage(..) => return false,
1379                AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1380                    continue;
1381                }
1382                AgentThreadEntry::ToolCall(..) => return true,
1383            }
1384        }
1385
1386        false
1387    }
1388
1389    pub fn handle_session_update(
1390        &mut self,
1391        update: acp::SessionUpdate,
1392        cx: &mut Context<Self>,
1393    ) -> Result<(), acp::Error> {
1394        match update {
1395            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1396                // We optimistically add the full user prompt before calling `prompt`.
1397                // Some ACP servers echo user chunks back over updates. Skip the chunk if
1398                // it's already present in the current user message to avoid duplicating content.
1399                let already_in_user_message = self
1400                    .entries
1401                    .last()
1402                    .and_then(|entry| entry.user_message())
1403                    .is_some_and(|message| message.chunks.contains(&content));
1404                if !already_in_user_message {
1405                    self.push_user_content_block(None, content, cx);
1406                }
1407            }
1408            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1409                self.push_assistant_content_block(content, false, cx);
1410            }
1411            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1412                self.push_assistant_content_block(content, true, cx);
1413            }
1414            acp::SessionUpdate::ToolCall(tool_call) => {
1415                self.upsert_tool_call(tool_call, cx)?;
1416            }
1417            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1418                self.update_tool_call(tool_call_update, cx)?;
1419            }
1420            acp::SessionUpdate::Plan(plan) => {
1421                self.update_plan(plan, cx);
1422            }
1423            acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1424                if let acp::MaybeUndefined::Value(title) = info_update.title {
1425                    let had_provisional = self.provisional_title.take().is_some();
1426                    let title: SharedString = title.into();
1427                    if self.title.as_ref() != Some(&title) {
1428                        self.title = Some(title);
1429                        cx.emit(AcpThreadEvent::TitleUpdated);
1430                    } else if had_provisional {
1431                        cx.emit(AcpThreadEvent::TitleUpdated);
1432                    }
1433                }
1434            }
1435            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1436                available_commands,
1437                ..
1438            }) => {
1439                self.available_commands = available_commands.clone();
1440                cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands));
1441            }
1442            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1443                current_mode_id,
1444                ..
1445            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1446            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1447                config_options,
1448                ..
1449            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1450            _ => {}
1451        }
1452        Ok(())
1453    }
1454
1455    pub fn push_user_content_block(
1456        &mut self,
1457        message_id: Option<UserMessageId>,
1458        chunk: acp::ContentBlock,
1459        cx: &mut Context<Self>,
1460    ) {
1461        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1462    }
1463
1464    pub fn push_user_content_block_with_indent(
1465        &mut self,
1466        message_id: Option<UserMessageId>,
1467        chunk: acp::ContentBlock,
1468        indented: bool,
1469        cx: &mut Context<Self>,
1470    ) {
1471        let language_registry = self.project.read(cx).languages().clone();
1472        let path_style = self.project.read(cx).path_style(cx);
1473        let entries_len = self.entries.len();
1474
1475        if let Some(last_entry) = self.entries.last_mut()
1476            && let AgentThreadEntry::UserMessage(UserMessage {
1477                id,
1478                content,
1479                chunks,
1480                indented: existing_indented,
1481                ..
1482            }) = last_entry
1483            && *existing_indented == indented
1484        {
1485            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1486            *id = message_id.or(id.take());
1487            content.append(chunk.clone(), &language_registry, path_style, cx);
1488            chunks.push(chunk);
1489            let idx = entries_len - 1;
1490            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1491        } else {
1492            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1493            self.push_entry(
1494                AgentThreadEntry::UserMessage(UserMessage {
1495                    id: message_id,
1496                    content,
1497                    chunks: vec![chunk],
1498                    checkpoint: None,
1499                    indented,
1500                }),
1501                cx,
1502            );
1503        }
1504    }
1505
1506    pub fn push_assistant_content_block(
1507        &mut self,
1508        chunk: acp::ContentBlock,
1509        is_thought: bool,
1510        cx: &mut Context<Self>,
1511    ) {
1512        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1513    }
1514
1515    pub fn push_assistant_content_block_with_indent(
1516        &mut self,
1517        chunk: acp::ContentBlock,
1518        is_thought: bool,
1519        indented: bool,
1520        cx: &mut Context<Self>,
1521    ) {
1522        let path_style = self.project.read(cx).path_style(cx);
1523
1524        // For text chunks going to an existing Markdown block, buffer for smooth
1525        // streaming instead of appending all at once which may feel more choppy.
1526        if let acp::ContentBlock::Text(text_content) = &chunk {
1527            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1528                let entries_len = self.entries.len();
1529                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1530                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1531                return;
1532            }
1533        }
1534
1535        let language_registry = self.project.read(cx).languages().clone();
1536        let entries_len = self.entries.len();
1537        if let Some(last_entry) = self.entries.last_mut()
1538            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1539                chunks,
1540                indented: existing_indented,
1541                is_subagent_output: _,
1542            }) = last_entry
1543            && *existing_indented == indented
1544        {
1545            let idx = entries_len - 1;
1546            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1547            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1548            match (chunks.last_mut(), is_thought) {
1549                (Some(AssistantMessageChunk::Message { block }), false)
1550                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1551                    block.append(chunk, &language_registry, path_style, cx)
1552                }
1553                _ => {
1554                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1555                    if is_thought {
1556                        chunks.push(AssistantMessageChunk::Thought { block })
1557                    } else {
1558                        chunks.push(AssistantMessageChunk::Message { block })
1559                    }
1560                }
1561            }
1562        } else {
1563            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1564            let chunk = if is_thought {
1565                AssistantMessageChunk::Thought { block }
1566            } else {
1567                AssistantMessageChunk::Message { block }
1568            };
1569
1570            self.push_entry(
1571                AgentThreadEntry::AssistantMessage(AssistantMessage {
1572                    chunks: vec![chunk],
1573                    indented,
1574                    is_subagent_output: false,
1575                }),
1576                cx,
1577            );
1578        }
1579    }
1580
1581    fn streaming_markdown_target(
1582        &self,
1583        is_thought: bool,
1584        indented: bool,
1585    ) -> Option<Entity<Markdown>> {
1586        let last_entry = self.entries.last()?;
1587        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1588            chunks,
1589            indented: existing_indented,
1590            ..
1591        }) = last_entry
1592            && *existing_indented == indented
1593            && let [.., chunk] = chunks.as_slice()
1594        {
1595            match (chunk, is_thought) {
1596                (
1597                    AssistantMessageChunk::Message {
1598                        block: ContentBlock::Markdown { markdown },
1599                    },
1600                    false,
1601                )
1602                | (
1603                    AssistantMessageChunk::Thought {
1604                        block: ContentBlock::Markdown { markdown },
1605                    },
1606                    true,
1607                ) => Some(markdown.clone()),
1608                _ => None,
1609            }
1610        } else {
1611            None
1612        }
1613    }
1614
1615    /// Add text to the streaming buffer. If the target changed (e.g. switching
1616    /// from thoughts to message text), flush the old buffer first.
1617    fn buffer_streaming_text(
1618        &mut self,
1619        markdown: &Entity<Markdown>,
1620        text: String,
1621        cx: &mut Context<Self>,
1622    ) {
1623        if let Some(buffer) = &mut self.streaming_text_buffer {
1624            if buffer.target.entity_id() == markdown.entity_id() {
1625                buffer.pending.push_str(&text);
1626
1627                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1628                    / StreamingTextBuffer::REVEAL_TARGET
1629                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1630                    .ceil() as usize;
1631                return;
1632            }
1633            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1634        }
1635
1636        let target = markdown.clone();
1637        let _reveal_task = self.start_streaming_reveal(cx);
1638        let pending_len = text.len();
1639        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1640            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1641            .ceil() as usize;
1642        self.streaming_text_buffer = Some(StreamingTextBuffer {
1643            pending: text,
1644            bytes_to_reveal_per_tick: bytes_to_reveal,
1645            target,
1646            _reveal_task,
1647        });
1648    }
1649
1650    /// Flush all buffered streaming text into the Markdown entity immediately.
1651    fn flush_streaming_text(
1652        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1653        cx: &mut Context<Self>,
1654    ) {
1655        if let Some(buffer) = streaming_text_buffer.take() {
1656            if !buffer.pending.is_empty() {
1657                buffer
1658                    .target
1659                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1660            }
1661        }
1662    }
1663
1664    /// Spawns a foreground task that periodically drains
1665    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1666    /// producing smooth, continuous text output.
1667    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1668        cx.spawn(async move |this, cx| {
1669            loop {
1670                cx.background_executor()
1671                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1672                    .await;
1673
1674                let should_continue = this
1675                    .update(cx, |this, cx| {
1676                        let Some(buffer) = &mut this.streaming_text_buffer else {
1677                            return false;
1678                        };
1679
1680                        if buffer.pending.is_empty() {
1681                            return true;
1682                        }
1683
1684                        let pending_len = buffer.pending.len();
1685
1686                        let byte_boundary = buffer
1687                            .pending
1688                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1689                            .min(pending_len);
1690
1691                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1692                            markdown.append(&buffer.pending[..byte_boundary], cx);
1693                            buffer.pending.drain(..byte_boundary);
1694                        });
1695
1696                        true
1697                    })
1698                    .unwrap_or(false);
1699
1700                if !should_continue {
1701                    break;
1702                }
1703            }
1704        })
1705    }
1706
1707    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1708        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1709        self.entries.push(entry);
1710        cx.emit(AcpThreadEvent::NewEntry);
1711    }
1712
1713    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1714        self.connection.set_title(&self.session_id, cx).is_some()
1715    }
1716
1717    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1718        let had_provisional = self.provisional_title.take().is_some();
1719        if self.title.as_ref() != Some(&title) {
1720            self.title = Some(title.clone());
1721            cx.emit(AcpThreadEvent::TitleUpdated);
1722            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1723                return set_title.run(title, cx);
1724            }
1725        } else if had_provisional {
1726            cx.emit(AcpThreadEvent::TitleUpdated);
1727        }
1728        Task::ready(Ok(()))
1729    }
1730
1731    /// Sets a provisional display title without propagating back to the
1732    /// underlying agent connection. This is used for quick preview titles
1733    /// (e.g. first 20 chars of the user message) that should be shown
1734    /// immediately but replaced once the LLM generates a proper title via
1735    /// `set_title`.
1736    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1737        self.provisional_title = Some(title);
1738        cx.emit(AcpThreadEvent::TitleUpdated);
1739    }
1740
1741    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1742        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1743    }
1744
1745    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1746        self.token_usage = usage;
1747        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1748    }
1749
1750    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1751        cx.emit(AcpThreadEvent::Retry(status));
1752    }
1753
1754    pub fn update_tool_call(
1755        &mut self,
1756        update: impl Into<ToolCallUpdate>,
1757        cx: &mut Context<Self>,
1758    ) -> Result<()> {
1759        let update = update.into();
1760        let languages = self.project.read(cx).languages().clone();
1761        let path_style = self.project.read(cx).path_style(cx);
1762
1763        let ix = match self.index_for_tool_call(update.id()) {
1764            Some(ix) => ix,
1765            None => {
1766                // Tool call not found - create a failed tool call entry
1767                let failed_tool_call = ToolCall {
1768                    id: update.id().clone(),
1769                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1770                    kind: acp::ToolKind::Fetch,
1771                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1772                        "Tool call not found".into(),
1773                        &languages,
1774                        path_style,
1775                        cx,
1776                    ))],
1777                    status: ToolCallStatus::Failed,
1778                    locations: Vec::new(),
1779                    resolved_locations: Vec::new(),
1780                    raw_input: None,
1781                    raw_input_markdown: None,
1782                    raw_output: None,
1783                    tool_name: None,
1784                    subagent_session_info: None,
1785                };
1786                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1787                return Ok(());
1788            }
1789        };
1790        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1791            unreachable!()
1792        };
1793
1794        match update {
1795            ToolCallUpdate::UpdateFields(update) => {
1796                let location_updated = update.fields.locations.is_some();
1797                call.update_fields(
1798                    update.fields,
1799                    update.meta,
1800                    languages,
1801                    path_style,
1802                    &self.terminals,
1803                    cx,
1804                )?;
1805                if location_updated {
1806                    self.resolve_locations(update.tool_call_id, cx);
1807                }
1808            }
1809            ToolCallUpdate::UpdateDiff(update) => {
1810                call.content.clear();
1811                call.content.push(ToolCallContent::Diff(update.diff));
1812            }
1813            ToolCallUpdate::UpdateTerminal(update) => {
1814                call.content.clear();
1815                call.content
1816                    .push(ToolCallContent::Terminal(update.terminal));
1817            }
1818        }
1819
1820        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1821
1822        Ok(())
1823    }
1824
1825    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1826    pub fn upsert_tool_call(
1827        &mut self,
1828        tool_call: acp::ToolCall,
1829        cx: &mut Context<Self>,
1830    ) -> Result<(), acp::Error> {
1831        let status = tool_call.status.into();
1832        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1833    }
1834
1835    /// Fails if id does not match an existing entry.
1836    pub fn upsert_tool_call_inner(
1837        &mut self,
1838        update: acp::ToolCallUpdate,
1839        status: ToolCallStatus,
1840        cx: &mut Context<Self>,
1841    ) -> Result<(), acp::Error> {
1842        let language_registry = self.project.read(cx).languages().clone();
1843        let path_style = self.project.read(cx).path_style(cx);
1844        let id = update.tool_call_id.clone();
1845
1846        let agent_telemetry_id = self.connection().telemetry_id();
1847        let session = self.session_id();
1848        let parent_session_id = self.parent_session_id();
1849        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1850            let status = if matches!(status, ToolCallStatus::Completed) {
1851                "completed"
1852            } else {
1853                "failed"
1854            };
1855            telemetry::event!(
1856                "Agent Tool Call Completed",
1857                agent_telemetry_id,
1858                session,
1859                parent_session_id,
1860                status
1861            );
1862        }
1863
1864        if let Some(ix) = self.index_for_tool_call(&id) {
1865            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1866                unreachable!()
1867            };
1868
1869            call.update_fields(
1870                update.fields,
1871                update.meta,
1872                language_registry,
1873                path_style,
1874                &self.terminals,
1875                cx,
1876            )?;
1877            call.status = status;
1878
1879            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1880        } else {
1881            let call = ToolCall::from_acp(
1882                update.try_into()?,
1883                status,
1884                language_registry,
1885                self.project.read(cx).path_style(cx),
1886                &self.terminals,
1887                cx,
1888            )?;
1889            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1890        };
1891
1892        self.resolve_locations(id, cx);
1893        Ok(())
1894    }
1895
1896    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1897        self.entries
1898            .iter()
1899            .enumerate()
1900            .rev()
1901            .find_map(|(index, entry)| {
1902                if let AgentThreadEntry::ToolCall(tool_call) = entry
1903                    && &tool_call.id == id
1904                {
1905                    Some(index)
1906                } else {
1907                    None
1908                }
1909            })
1910    }
1911
1912    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1913        // The tool call we are looking for is typically the last one, or very close to the end.
1914        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1915        self.entries
1916            .iter_mut()
1917            .enumerate()
1918            .rev()
1919            .find_map(|(index, tool_call)| {
1920                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1921                    && &tool_call.id == id
1922                {
1923                    Some((index, tool_call))
1924                } else {
1925                    None
1926                }
1927            })
1928    }
1929
1930    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1931        self.entries
1932            .iter()
1933            .enumerate()
1934            .rev()
1935            .find_map(|(index, tool_call)| {
1936                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1937                    && &tool_call.id == id
1938                {
1939                    Some((index, tool_call))
1940                } else {
1941                    None
1942                }
1943            })
1944    }
1945
1946    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1947        self.entries.iter().find_map(|entry| match entry {
1948            AgentThreadEntry::ToolCall(tool_call) => {
1949                if let Some(subagent_session_info) = &tool_call.subagent_session_info
1950                    && &subagent_session_info.session_id == session_id
1951                {
1952                    Some(tool_call)
1953                } else {
1954                    None
1955                }
1956            }
1957            _ => None,
1958        })
1959    }
1960
1961    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1962        let project = self.project.clone();
1963        let should_update_agent_location = self.parent_session_id.is_none();
1964        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1965            return;
1966        };
1967        let task = tool_call.resolve_locations(project, cx);
1968        cx.spawn(async move |this, cx| {
1969            let resolved_locations = task.await;
1970
1971            this.update(cx, |this, cx| {
1972                let project = this.project.clone();
1973
1974                for location in resolved_locations.iter().flatten() {
1975                    this.shared_buffers
1976                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1977                }
1978                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1979                    return;
1980                };
1981
1982                if let Some(Some(location)) = resolved_locations.last() {
1983                    project.update(cx, |project, cx| {
1984                        let should_ignore = if let Some(agent_location) = project
1985                            .agent_location()
1986                            .filter(|agent_location| agent_location.buffer == location.buffer)
1987                        {
1988                            let snapshot = location.buffer.read(cx).snapshot();
1989                            let old_position = agent_location.position.to_point(&snapshot);
1990                            let new_position = location.position.to_point(&snapshot);
1991
1992                            // ignore this so that when we get updates from the edit tool
1993                            // the position doesn't reset to the startof line
1994                            old_position.row == new_position.row
1995                                && old_position.column > new_position.column
1996                        } else {
1997                            false
1998                        };
1999                        if !should_ignore && should_update_agent_location {
2000                            project.set_agent_location(Some(location.into()), cx);
2001                        }
2002                    });
2003                }
2004
2005                let resolved_locations = resolved_locations
2006                    .iter()
2007                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
2008                    .collect::<Vec<_>>();
2009
2010                if tool_call.resolved_locations != resolved_locations {
2011                    tool_call.resolved_locations = resolved_locations;
2012                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
2013                }
2014            })
2015        })
2016        .detach();
2017    }
2018
2019    pub fn request_tool_call_authorization(
2020        &mut self,
2021        tool_call: acp::ToolCallUpdate,
2022        options: PermissionOptions,
2023        cx: &mut Context<Self>,
2024    ) -> Result<Task<RequestPermissionOutcome>> {
2025        let (tx, rx) = oneshot::channel();
2026
2027        let status = ToolCallStatus::WaitingForConfirmation {
2028            options,
2029            respond_tx: tx,
2030        };
2031
2032        let tool_call_id = tool_call.tool_call_id.clone();
2033        self.upsert_tool_call_inner(tool_call, status, cx)?;
2034        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2035            tool_call_id.clone(),
2036        ));
2037
2038        Ok(cx.spawn(async move |this, cx| {
2039            let outcome = match rx.await {
2040                Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2041                Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2042            };
2043            this.update(cx, |_this, cx| {
2044                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2045            })
2046            .ok();
2047            outcome
2048        }))
2049    }
2050
2051    pub fn authorize_tool_call(
2052        &mut self,
2053        id: acp::ToolCallId,
2054        outcome: SelectedPermissionOutcome,
2055        cx: &mut Context<Self>,
2056    ) {
2057        let Some((ix, call)) = self.tool_call_mut(&id) else {
2058            return;
2059        };
2060
2061        let new_status = match outcome.option_kind {
2062            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2063                ToolCallStatus::Rejected
2064            }
2065            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2066                ToolCallStatus::InProgress
2067            }
2068            _ => ToolCallStatus::InProgress,
2069        };
2070
2071        let curr_status = mem::replace(&mut call.status, new_status);
2072
2073        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2074            respond_tx.send(outcome).log_err();
2075        } else if cfg!(debug_assertions) {
2076            panic!("tried to authorize an already authorized tool call");
2077        }
2078
2079        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2080    }
2081
2082    pub fn plan(&self) -> &Plan {
2083        &self.plan
2084    }
2085
2086    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2087        let new_entries_len = request.entries.len();
2088        let mut new_entries = request.entries.into_iter();
2089
2090        // Reuse existing markdown to prevent flickering
2091        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2092            let PlanEntry {
2093                content,
2094                priority,
2095                status,
2096            } = old;
2097            content.update(cx, |old, cx| {
2098                old.replace(new.content, cx);
2099            });
2100            *priority = new.priority;
2101            *status = new.status;
2102        }
2103        for new in new_entries {
2104            self.plan.entries.push(PlanEntry::from_acp(new, cx))
2105        }
2106        self.plan.entries.truncate(new_entries_len);
2107
2108        cx.notify();
2109    }
2110
2111    pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2112        if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2113            let completed_entries = std::mem::take(&mut self.plan.entries);
2114            self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2115        }
2116    }
2117
2118    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2119        self.plan
2120            .entries
2121            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2122        cx.notify();
2123    }
2124
2125    pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2126        self.plan.entries.clear();
2127        cx.notify();
2128    }
2129
2130    #[cfg(any(test, feature = "test-support"))]
2131    pub fn send_raw(
2132        &mut self,
2133        message: &str,
2134        cx: &mut Context<Self>,
2135    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2136        self.send(vec![message.into()], cx)
2137    }
2138
2139    pub fn send(
2140        &mut self,
2141        message: Vec<acp::ContentBlock>,
2142        cx: &mut Context<Self>,
2143    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2144        let block = ContentBlock::new_combined(
2145            message.clone(),
2146            self.project.read(cx).languages().clone(),
2147            self.project.read(cx).path_style(cx),
2148            cx,
2149        );
2150        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2151        let git_store = self.project.read(cx).git_store().clone();
2152
2153        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2154            Some(UserMessageId::new())
2155        } else {
2156            None
2157        };
2158
2159        self.run_turn(cx, async move |this, cx| {
2160            this.update(cx, |this, cx| {
2161                this.push_entry(
2162                    AgentThreadEntry::UserMessage(UserMessage {
2163                        id: message_id.clone(),
2164                        content: block,
2165                        chunks: message,
2166                        checkpoint: None,
2167                        indented: false,
2168                    }),
2169                    cx,
2170                );
2171            })
2172            .ok();
2173
2174            let old_checkpoint = git_store
2175                .update(cx, |git, cx| git.checkpoint(cx))
2176                .await
2177                .context("failed to get old checkpoint")
2178                .log_err();
2179            this.update(cx, |this, cx| {
2180                if let Some((_ix, message)) = this.last_user_message() {
2181                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2182                        git_checkpoint,
2183                        show: false,
2184                    });
2185                }
2186                this.connection.prompt(message_id, request, cx)
2187            })?
2188            .await
2189        })
2190    }
2191
2192    pub fn can_retry(&self, cx: &App) -> bool {
2193        self.connection.retry(&self.session_id, cx).is_some()
2194    }
2195
2196    pub fn retry(
2197        &mut self,
2198        cx: &mut Context<Self>,
2199    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2200        self.run_turn(cx, async move |this, cx| {
2201            this.update(cx, |this, cx| {
2202                this.connection
2203                    .retry(&this.session_id, cx)
2204                    .map(|retry| retry.run(cx))
2205            })?
2206            .context("retrying a session is not supported")?
2207            .await
2208        })
2209    }
2210
2211    fn run_turn(
2212        &mut self,
2213        cx: &mut Context<Self>,
2214        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2215    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2216        self.clear_completed_plan_entries(cx);
2217        self.had_error = false;
2218
2219        let (tx, rx) = oneshot::channel();
2220        let cancel_task = self.cancel(cx);
2221
2222        self.turn_id += 1;
2223        let turn_id = self.turn_id;
2224        self.running_turn = Some(RunningTurn {
2225            id: turn_id,
2226            send_task: cx.spawn(async move |this, cx| {
2227                cancel_task.await;
2228                tx.send(f(this, cx).await).ok();
2229            }),
2230        });
2231
2232        cx.spawn(async move |this, cx| {
2233            let response = rx.await;
2234
2235            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2236                .await?;
2237
2238            this.update(cx, |this, cx| {
2239                if this.parent_session_id.is_none() {
2240                    this.project
2241                        .update(cx, |project, cx| project.set_agent_location(None, cx));
2242                }
2243                let Ok(response) = response else {
2244                    // tx dropped, just return
2245                    return Ok(None);
2246                };
2247
2248                let is_same_turn = this
2249                    .running_turn
2250                    .as_ref()
2251                    .is_some_and(|turn| turn_id == turn.id);
2252
2253                // If the user submitted a follow up message, running_turn might
2254                // already point to a different turn. Therefore we only want to
2255                // take the task if it's the same turn.
2256                if is_same_turn {
2257                    this.running_turn.take();
2258                }
2259
2260                match response {
2261                    Ok(r) => {
2262                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2263
2264                        if r.stop_reason == acp::StopReason::MaxTokens {
2265                            this.had_error = true;
2266                            cx.emit(AcpThreadEvent::Error);
2267                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2268
2269                            let exceeded_max_output_tokens =
2270                                this.token_usage.as_ref().is_some_and(|u| {
2271                                    u.max_output_tokens
2272                                        .is_some_and(|max| u.output_tokens >= max)
2273                                });
2274
2275                            let message = if exceeded_max_output_tokens {
2276                                log::error!(
2277                                    "Max output tokens reached. Usage: {:?}",
2278                                    this.token_usage
2279                                );
2280                                "Maximum output tokens reached"
2281                            } else {
2282                                log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2283                                "Maximum tokens reached"
2284                            };
2285                            return Err(anyhow!(message));
2286                        }
2287
2288                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2289                        if canceled {
2290                            this.mark_pending_tools_as_canceled();
2291                        }
2292
2293                        if !canceled {
2294                            this.snapshot_completed_plan(cx);
2295                        }
2296
2297                        // Handle refusal - distinguish between user prompt and tool call refusals
2298                        if let acp::StopReason::Refusal = r.stop_reason {
2299                            this.had_error = true;
2300                            if let Some((user_msg_ix, _)) = this.last_user_message() {
2301                                // Check if there's a completed tool call with results after the last user message
2302                                // This indicates the refusal is in response to tool output, not the user's prompt
2303                                let has_completed_tool_call_after_user_msg =
2304                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2305                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2306                                            // Check if the tool call has completed and has output
2307                                            matches!(tool_call.status, ToolCallStatus::Completed)
2308                                                && tool_call.raw_output.is_some()
2309                                        } else {
2310                                            false
2311                                        }
2312                                    });
2313
2314                                if has_completed_tool_call_after_user_msg {
2315                                    // Refusal is due to tool output - don't truncate, just notify
2316                                    // The model refused based on what the tool returned
2317                                    cx.emit(AcpThreadEvent::Refusal);
2318                                } else {
2319                                    // User prompt was refused - truncate back to before the user message
2320                                    let range = user_msg_ix..this.entries.len();
2321                                    if range.start < range.end {
2322                                        this.entries.truncate(user_msg_ix);
2323                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2324                                    }
2325                                    cx.emit(AcpThreadEvent::Refusal);
2326                                }
2327                            } else {
2328                                // No user message found, treat as general refusal
2329                                cx.emit(AcpThreadEvent::Refusal);
2330                            }
2331                        }
2332
2333                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2334                        Ok(Some(r))
2335                    }
2336                    Err(e) => {
2337                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2338
2339                        this.had_error = true;
2340                        cx.emit(AcpThreadEvent::Error);
2341                        log::error!("Error in run turn: {:?}", e);
2342                        Err(e)
2343                    }
2344                }
2345            })?
2346        })
2347        .boxed()
2348    }
2349
2350    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2351        let Some(turn) = self.running_turn.take() else {
2352            return Task::ready(());
2353        };
2354        self.connection.cancel(&self.session_id, cx);
2355
2356        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2357        self.mark_pending_tools_as_canceled();
2358
2359        // Wait for the send task to complete
2360        cx.background_spawn(turn.send_task)
2361    }
2362
2363    fn mark_pending_tools_as_canceled(&mut self) {
2364        for entry in self.entries.iter_mut() {
2365            if let AgentThreadEntry::ToolCall(call) = entry {
2366                let cancel = matches!(
2367                    call.status,
2368                    ToolCallStatus::Pending
2369                        | ToolCallStatus::WaitingForConfirmation { .. }
2370                        | ToolCallStatus::InProgress
2371                );
2372
2373                if cancel {
2374                    call.status = ToolCallStatus::Canceled;
2375                }
2376            }
2377        }
2378    }
2379
2380    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2381    pub fn restore_checkpoint(
2382        &mut self,
2383        id: UserMessageId,
2384        cx: &mut Context<Self>,
2385    ) -> Task<Result<()>> {
2386        let Some((_, message)) = self.user_message_mut(&id) else {
2387            return Task::ready(Err(anyhow!("message not found")));
2388        };
2389
2390        let checkpoint = message
2391            .checkpoint
2392            .as_ref()
2393            .map(|c| c.git_checkpoint.clone());
2394
2395        // Cancel any in-progress generation before restoring
2396        let cancel_task = self.cancel(cx);
2397        let rewind = self.rewind(id.clone(), cx);
2398        let git_store = self.project.read(cx).git_store().clone();
2399
2400        cx.spawn(async move |_, cx| {
2401            cancel_task.await;
2402            rewind.await?;
2403            if let Some(checkpoint) = checkpoint {
2404                git_store
2405                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2406                    .await?;
2407            }
2408
2409            Ok(())
2410        })
2411    }
2412
2413    /// Rewinds this thread to before the entry at `index`, removing it and all
2414    /// subsequent entries while rejecting any action_log changes made from that point.
2415    /// Unlike `restore_checkpoint`, this method does not restore from git.
2416    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2417        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2418            return Task::ready(Err(anyhow!("not supported")));
2419        };
2420
2421        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2422        let telemetry = ActionLogTelemetry::from(&*self);
2423        cx.spawn(async move |this, cx| {
2424            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2425            this.update(cx, |this, cx| {
2426                if let Some((ix, _)) = this.user_message_mut(&id) {
2427                    // Collect all terminals from entries that will be removed
2428                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2429                        .iter()
2430                        .flat_map(|entry| entry.terminals())
2431                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2432                        .collect();
2433
2434                    let range = ix..this.entries.len();
2435                    this.entries.truncate(ix);
2436                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2437
2438                    // Kill and remove the terminals
2439                    for terminal_id in terminals_to_remove {
2440                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2441                            terminal.update(cx, |terminal, cx| {
2442                                terminal.kill(cx);
2443                            });
2444                        }
2445                    }
2446                }
2447                this.action_log().update(cx, |action_log, cx| {
2448                    action_log.reject_all_edits(Some(telemetry), cx)
2449                })
2450            })?
2451            .await;
2452            Ok(())
2453        })
2454    }
2455
2456    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2457        let git_store = self.project.read(cx).git_store().clone();
2458
2459        let Some((_, message)) = self.last_user_message() else {
2460            return Task::ready(Ok(()));
2461        };
2462        let Some(user_message_id) = message.id.clone() else {
2463            return Task::ready(Ok(()));
2464        };
2465        let Some(checkpoint) = message.checkpoint.as_ref() else {
2466            return Task::ready(Ok(()));
2467        };
2468        let old_checkpoint = checkpoint.git_checkpoint.clone();
2469
2470        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2471        cx.spawn(async move |this, cx| {
2472            let Some(new_checkpoint) = new_checkpoint
2473                .await
2474                .context("failed to get new checkpoint")
2475                .log_err()
2476            else {
2477                return Ok(());
2478            };
2479
2480            let equal = git_store
2481                .update(cx, |git, cx| {
2482                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2483                })
2484                .await
2485                .unwrap_or(true);
2486
2487            this.update(cx, |this, cx| {
2488                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2489                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2490                        checkpoint.show = !equal;
2491                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2492                    }
2493                }
2494            })?;
2495
2496            Ok(())
2497        })
2498    }
2499
2500    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2501        self.entries
2502            .iter_mut()
2503            .enumerate()
2504            .rev()
2505            .find_map(|(ix, entry)| {
2506                if let AgentThreadEntry::UserMessage(message) = entry {
2507                    Some((ix, message))
2508                } else {
2509                    None
2510                }
2511            })
2512    }
2513
2514    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2515        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2516            if let AgentThreadEntry::UserMessage(message) = entry {
2517                if message.id.as_ref() == Some(id) {
2518                    Some((ix, message))
2519                } else {
2520                    None
2521                }
2522            } else {
2523                None
2524            }
2525        })
2526    }
2527
2528    pub fn read_text_file(
2529        &self,
2530        path: PathBuf,
2531        line: Option<u32>,
2532        limit: Option<u32>,
2533        reuse_shared_snapshot: bool,
2534        cx: &mut Context<Self>,
2535    ) -> Task<Result<String, acp::Error>> {
2536        // Args are 1-based, move to 0-based
2537        let line = line.unwrap_or_default().saturating_sub(1);
2538        let limit = limit.unwrap_or(u32::MAX);
2539        let project = self.project.clone();
2540        let action_log = self.action_log.clone();
2541        let should_update_agent_location = self.parent_session_id.is_none();
2542        cx.spawn(async move |this, cx| {
2543            let load = project.update(cx, |project, cx| {
2544                let path = project
2545                    .project_path_for_absolute_path(&path, cx)
2546                    .ok_or_else(|| {
2547                        acp::Error::resource_not_found(Some(path.display().to_string()))
2548                    })?;
2549                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2550            })?;
2551
2552            let buffer = load.await?;
2553
2554            let snapshot = if reuse_shared_snapshot {
2555                this.read_with(cx, |this, _| {
2556                    this.shared_buffers.get(&buffer.clone()).cloned()
2557                })
2558                .log_err()
2559                .flatten()
2560            } else {
2561                None
2562            };
2563
2564            let snapshot = if let Some(snapshot) = snapshot {
2565                snapshot
2566            } else {
2567                action_log.update(cx, |action_log, cx| {
2568                    action_log.buffer_read(buffer.clone(), cx);
2569                });
2570
2571                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2572                this.update(cx, |this, _| {
2573                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2574                })?;
2575                snapshot
2576            };
2577
2578            let max_point = snapshot.max_point();
2579            let start_position = Point::new(line, 0);
2580
2581            if start_position > max_point {
2582                return Err(acp::Error::invalid_params().data(format!(
2583                    "Attempting to read beyond the end of the file, line {}:{}",
2584                    max_point.row + 1,
2585                    max_point.column
2586                )));
2587            }
2588
2589            let start = snapshot.anchor_before(start_position);
2590            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2591
2592            if should_update_agent_location {
2593                project.update(cx, |project, cx| {
2594                    project.set_agent_location(
2595                        Some(AgentLocation {
2596                            buffer: buffer.downgrade(),
2597                            position: start,
2598                        }),
2599                        cx,
2600                    );
2601                });
2602            }
2603
2604            Ok(snapshot.text_for_range(start..end).collect::<String>())
2605        })
2606    }
2607
2608    pub fn write_text_file(
2609        &self,
2610        path: PathBuf,
2611        content: String,
2612        cx: &mut Context<Self>,
2613    ) -> Task<Result<()>> {
2614        let project = self.project.clone();
2615        let action_log = self.action_log.clone();
2616        let should_update_agent_location = self.parent_session_id.is_none();
2617        cx.spawn(async move |this, cx| {
2618            let load = project.update(cx, |project, cx| {
2619                let path = project
2620                    .project_path_for_absolute_path(&path, cx)
2621                    .context("invalid path")?;
2622                anyhow::Ok(project.open_buffer(path, cx))
2623            });
2624            let buffer = load?.await?;
2625            let snapshot = this.update(cx, |this, cx| {
2626                this.shared_buffers
2627                    .get(&buffer)
2628                    .cloned()
2629                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2630            })?;
2631            let edits = cx
2632                .background_executor()
2633                .spawn(async move {
2634                    let old_text = snapshot.text();
2635                    text_diff(old_text.as_str(), &content)
2636                        .into_iter()
2637                        .map(|(range, replacement)| {
2638                            (snapshot.anchor_range_inside(range), replacement)
2639                        })
2640                        .collect::<Vec<_>>()
2641                })
2642                .await;
2643
2644            if should_update_agent_location {
2645                project.update(cx, |project, cx| {
2646                    project.set_agent_location(
2647                        Some(AgentLocation {
2648                            buffer: buffer.downgrade(),
2649                            position: edits
2650                                .last()
2651                                .map(|(range, _)| range.end)
2652                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2653                        }),
2654                        cx,
2655                    );
2656                });
2657            }
2658
2659            let format_on_save = cx.update(|cx| {
2660                action_log.update(cx, |action_log, cx| {
2661                    action_log.buffer_read(buffer.clone(), cx);
2662                });
2663
2664                let format_on_save = buffer.update(cx, |buffer, cx| {
2665                    buffer.edit(edits, None, cx);
2666
2667                    let settings =
2668                        language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2669
2670                    settings.format_on_save != FormatOnSave::Off
2671                });
2672                action_log.update(cx, |action_log, cx| {
2673                    action_log.buffer_edited(buffer.clone(), cx);
2674                });
2675                format_on_save
2676            });
2677
2678            if format_on_save {
2679                let format_task = project.update(cx, |project, cx| {
2680                    project.format(
2681                        HashSet::from_iter([buffer.clone()]),
2682                        LspFormatTarget::Buffers,
2683                        false,
2684                        FormatTrigger::Save,
2685                        cx,
2686                    )
2687                });
2688                format_task.await.log_err();
2689
2690                action_log.update(cx, |action_log, cx| {
2691                    action_log.buffer_edited(buffer.clone(), cx);
2692                });
2693            }
2694
2695            project
2696                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2697                .await
2698        })
2699    }
2700
2701    pub fn create_terminal(
2702        &self,
2703        command: String,
2704        args: Vec<String>,
2705        extra_env: Vec<acp::EnvVariable>,
2706        cwd: Option<PathBuf>,
2707        output_byte_limit: Option<u64>,
2708        cx: &mut Context<Self>,
2709    ) -> Task<Result<Entity<Terminal>>> {
2710        let env = match &cwd {
2711            Some(dir) => self.project.update(cx, |project, cx| {
2712                project.environment().update(cx, |env, cx| {
2713                    env.directory_environment(dir.as_path().into(), cx)
2714                })
2715            }),
2716            None => Task::ready(None).shared(),
2717        };
2718        let env = cx.spawn(async move |_, _| {
2719            let mut env = env.await.unwrap_or_default();
2720            // Disables paging for `git` and hopefully other commands
2721            env.insert("PAGER".into(), "".into());
2722            for var in extra_env {
2723                env.insert(var.name, var.value);
2724            }
2725            env
2726        });
2727
2728        let project = self.project.clone();
2729        let language_registry = project.read(cx).languages().clone();
2730        let is_windows = project.read(cx).path_style(cx).is_windows();
2731
2732        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2733        let terminal_task = cx.spawn({
2734            let terminal_id = terminal_id.clone();
2735            async move |_this, cx| {
2736                let env = env.await;
2737                let shell = project
2738                    .update(cx, |project, cx| {
2739                        project
2740                            .remote_client()
2741                            .and_then(|r| r.read(cx).default_system_shell())
2742                    })
2743                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2744                let (task_command, task_args) =
2745                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2746                        .redirect_stdin_to_dev_null()
2747                        .build(Some(command.clone()), &args);
2748                let terminal = project
2749                    .update(cx, |project, cx| {
2750                        project.create_terminal_task(
2751                            task::SpawnInTerminal {
2752                                command: Some(task_command),
2753                                args: task_args,
2754                                cwd: cwd.clone(),
2755                                env,
2756                                ..Default::default()
2757                            },
2758                            cx,
2759                        )
2760                    })
2761                    .await?;
2762
2763                anyhow::Ok(cx.new(|cx| {
2764                    Terminal::new(
2765                        terminal_id,
2766                        &format!("{} {}", command, args.join(" ")),
2767                        cwd,
2768                        output_byte_limit.map(|l| l as usize),
2769                        terminal,
2770                        language_registry,
2771                        cx,
2772                    )
2773                }))
2774            }
2775        });
2776
2777        cx.spawn(async move |this, cx| {
2778            let terminal = terminal_task.await?;
2779            this.update(cx, |this, _cx| {
2780                this.terminals.insert(terminal_id, terminal.clone());
2781                terminal
2782            })
2783        })
2784    }
2785
2786    pub fn kill_terminal(
2787        &mut self,
2788        terminal_id: acp::TerminalId,
2789        cx: &mut Context<Self>,
2790    ) -> Result<()> {
2791        self.terminals
2792            .get(&terminal_id)
2793            .context("Terminal not found")?
2794            .update(cx, |terminal, cx| {
2795                terminal.kill(cx);
2796            });
2797
2798        Ok(())
2799    }
2800
2801    pub fn release_terminal(
2802        &mut self,
2803        terminal_id: acp::TerminalId,
2804        cx: &mut Context<Self>,
2805    ) -> Result<()> {
2806        self.terminals
2807            .remove(&terminal_id)
2808            .context("Terminal not found")?
2809            .update(cx, |terminal, cx| {
2810                terminal.kill(cx);
2811            });
2812
2813        Ok(())
2814    }
2815
2816    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2817        self.terminals
2818            .get(&terminal_id)
2819            .context("Terminal not found")
2820            .cloned()
2821    }
2822
2823    pub fn to_markdown(&self, cx: &App) -> String {
2824        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2825    }
2826
2827    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2828        cx.emit(AcpThreadEvent::LoadError(error));
2829    }
2830
2831    pub fn register_terminal_created(
2832        &mut self,
2833        terminal_id: acp::TerminalId,
2834        command_label: String,
2835        working_dir: Option<PathBuf>,
2836        output_byte_limit: Option<u64>,
2837        terminal: Entity<::terminal::Terminal>,
2838        cx: &mut Context<Self>,
2839    ) -> Entity<Terminal> {
2840        let language_registry = self.project.read(cx).languages().clone();
2841
2842        let entity = cx.new(|cx| {
2843            Terminal::new(
2844                terminal_id.clone(),
2845                &command_label,
2846                working_dir.clone(),
2847                output_byte_limit.map(|l| l as usize),
2848                terminal,
2849                language_registry,
2850                cx,
2851            )
2852        });
2853        self.terminals.insert(terminal_id.clone(), entity.clone());
2854        entity
2855    }
2856
2857    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2858        for entry in self.entries.iter_mut().rev() {
2859            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2860                assistant_message.is_subagent_output = true;
2861                cx.notify();
2862                return;
2863            }
2864        }
2865    }
2866
2867    pub fn on_terminal_provider_event(
2868        &mut self,
2869        event: TerminalProviderEvent,
2870        cx: &mut Context<Self>,
2871    ) {
2872        match event {
2873            TerminalProviderEvent::Created {
2874                terminal_id,
2875                label,
2876                cwd,
2877                output_byte_limit,
2878                terminal,
2879            } => {
2880                let entity = self.register_terminal_created(
2881                    terminal_id.clone(),
2882                    label,
2883                    cwd,
2884                    output_byte_limit,
2885                    terminal,
2886                    cx,
2887                );
2888
2889                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2890                    for data in chunks.drain(..) {
2891                        entity.update(cx, |term, cx| {
2892                            term.inner().update(cx, |inner, cx| {
2893                                inner.write_output(&data, cx);
2894                            })
2895                        });
2896                    }
2897                }
2898
2899                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2900                    entity.update(cx, |_term, cx| {
2901                        cx.notify();
2902                    });
2903                }
2904
2905                cx.notify();
2906            }
2907            TerminalProviderEvent::Output { terminal_id, data } => {
2908                if let Some(entity) = self.terminals.get(&terminal_id) {
2909                    entity.update(cx, |term, cx| {
2910                        term.inner().update(cx, |inner, cx| {
2911                            inner.write_output(&data, cx);
2912                        })
2913                    });
2914                } else {
2915                    self.pending_terminal_output
2916                        .entry(terminal_id)
2917                        .or_default()
2918                        .push(data);
2919                }
2920            }
2921            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2922                if let Some(entity) = self.terminals.get(&terminal_id) {
2923                    entity.update(cx, |term, cx| {
2924                        term.inner().update(cx, |inner, cx| {
2925                            inner.breadcrumb_text = title;
2926                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2927                        })
2928                    });
2929                }
2930            }
2931            TerminalProviderEvent::Exit {
2932                terminal_id,
2933                status,
2934            } => {
2935                if let Some(entity) = self.terminals.get(&terminal_id) {
2936                    entity.update(cx, |_term, cx| {
2937                        cx.notify();
2938                    });
2939                } else {
2940                    self.pending_terminal_exit.insert(terminal_id, status);
2941                }
2942            }
2943        }
2944    }
2945}
2946
2947fn markdown_for_raw_output(
2948    raw_output: &serde_json::Value,
2949    language_registry: &Arc<LanguageRegistry>,
2950    cx: &mut App,
2951) -> Option<Entity<Markdown>> {
2952    match raw_output {
2953        serde_json::Value::Null => None,
2954        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2955            Markdown::new(
2956                value.to_string().into(),
2957                Some(language_registry.clone()),
2958                None,
2959                cx,
2960            )
2961        })),
2962        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2963            Markdown::new(
2964                value.to_string().into(),
2965                Some(language_registry.clone()),
2966                None,
2967                cx,
2968            )
2969        })),
2970        serde_json::Value::String(value) => Some(cx.new(|cx| {
2971            Markdown::new(
2972                value.clone().into(),
2973                Some(language_registry.clone()),
2974                None,
2975                cx,
2976            )
2977        })),
2978        value => Some(cx.new(|cx| {
2979            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2980
2981            Markdown::new(
2982                format!("```json\n{}\n```", pretty_json).into(),
2983                Some(language_registry.clone()),
2984                None,
2985                cx,
2986            )
2987        })),
2988    }
2989}
2990
2991#[cfg(test)]
2992mod tests {
2993    use super::*;
2994    use anyhow::anyhow;
2995    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2996    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2997    use indoc::indoc;
2998    use project::{AgentId, FakeFs, Fs};
2999    use rand::{distr, prelude::*};
3000    use serde_json::json;
3001    use settings::SettingsStore;
3002    use smol::stream::StreamExt as _;
3003    use std::{
3004        any::Any,
3005        cell::RefCell,
3006        path::Path,
3007        rc::Rc,
3008        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
3009        time::Duration,
3010    };
3011    use util::{path, path_list::PathList};
3012
3013    fn init_test(cx: &mut TestAppContext) {
3014        env_logger::try_init().ok();
3015        cx.update(|cx| {
3016            let settings_store = SettingsStore::test(cx);
3017            cx.set_global(settings_store);
3018        });
3019    }
3020
3021    #[gpui::test]
3022    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3023        init_test(cx);
3024
3025        let fs = FakeFs::new(cx.executor());
3026        let project = Project::test(fs, [], cx).await;
3027        let connection = Rc::new(FakeAgentConnection::new());
3028        let thread = cx
3029            .update(|cx| {
3030                connection.new_session(
3031                    project,
3032                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3033                    cx,
3034                )
3035            })
3036            .await
3037            .unwrap();
3038
3039        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3040
3041        // Send Output BEFORE Created - should be buffered by acp_thread
3042        thread.update(cx, |thread, cx| {
3043            thread.on_terminal_provider_event(
3044                TerminalProviderEvent::Output {
3045                    terminal_id: terminal_id.clone(),
3046                    data: b"hello buffered".to_vec(),
3047                },
3048                cx,
3049            );
3050        });
3051
3052        // Create a display-only terminal and then send Created
3053        let lower = cx.new(|cx| {
3054            let builder = ::terminal::TerminalBuilder::new_display_only(
3055                ::terminal::terminal_settings::CursorShape::default(),
3056                ::terminal::terminal_settings::AlternateScroll::On,
3057                None,
3058                0,
3059                cx.background_executor(),
3060                PathStyle::local(),
3061            )
3062            .unwrap();
3063            builder.subscribe(cx)
3064        });
3065
3066        thread.update(cx, |thread, cx| {
3067            thread.on_terminal_provider_event(
3068                TerminalProviderEvent::Created {
3069                    terminal_id: terminal_id.clone(),
3070                    label: "Buffered Test".to_string(),
3071                    cwd: None,
3072                    output_byte_limit: None,
3073                    terminal: lower.clone(),
3074                },
3075                cx,
3076            );
3077        });
3078
3079        // After Created, buffered Output should have been flushed into the renderer
3080        let content = thread.read_with(cx, |thread, cx| {
3081            let term = thread.terminal(terminal_id.clone()).unwrap();
3082            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3083        });
3084
3085        assert!(
3086            content.contains("hello buffered"),
3087            "expected buffered output to render, got: {content}"
3088        );
3089    }
3090
3091    #[gpui::test]
3092    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3093        init_test(cx);
3094
3095        let fs = FakeFs::new(cx.executor());
3096        let project = Project::test(fs, [], cx).await;
3097        let connection = Rc::new(FakeAgentConnection::new());
3098        let thread = cx
3099            .update(|cx| {
3100                connection.new_session(
3101                    project,
3102                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3103                    cx,
3104                )
3105            })
3106            .await
3107            .unwrap();
3108
3109        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3110
3111        // Send Output BEFORE Created
3112        thread.update(cx, |thread, cx| {
3113            thread.on_terminal_provider_event(
3114                TerminalProviderEvent::Output {
3115                    terminal_id: terminal_id.clone(),
3116                    data: b"pre-exit data".to_vec(),
3117                },
3118                cx,
3119            );
3120        });
3121
3122        // Send Exit BEFORE Created
3123        thread.update(cx, |thread, cx| {
3124            thread.on_terminal_provider_event(
3125                TerminalProviderEvent::Exit {
3126                    terminal_id: terminal_id.clone(),
3127                    status: acp::TerminalExitStatus::new().exit_code(0),
3128                },
3129                cx,
3130            );
3131        });
3132
3133        // Now create a display-only lower-level terminal and send Created
3134        let lower = cx.new(|cx| {
3135            let builder = ::terminal::TerminalBuilder::new_display_only(
3136                ::terminal::terminal_settings::CursorShape::default(),
3137                ::terminal::terminal_settings::AlternateScroll::On,
3138                None,
3139                0,
3140                cx.background_executor(),
3141                PathStyle::local(),
3142            )
3143            .unwrap();
3144            builder.subscribe(cx)
3145        });
3146
3147        thread.update(cx, |thread, cx| {
3148            thread.on_terminal_provider_event(
3149                TerminalProviderEvent::Created {
3150                    terminal_id: terminal_id.clone(),
3151                    label: "Buffered Exit Test".to_string(),
3152                    cwd: None,
3153                    output_byte_limit: None,
3154                    terminal: lower.clone(),
3155                },
3156                cx,
3157            );
3158        });
3159
3160        // Output should be present after Created (flushed from buffer)
3161        let content = thread.read_with(cx, |thread, cx| {
3162            let term = thread.terminal(terminal_id.clone()).unwrap();
3163            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3164        });
3165
3166        assert!(
3167            content.contains("pre-exit data"),
3168            "expected pre-exit data to render, got: {content}"
3169        );
3170    }
3171
3172    /// Test that killing a terminal via Terminal::kill properly:
3173    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3174    /// 2. The underlying terminal still has the output that was written before the kill
3175    ///
3176    /// This test verifies that the fix to kill_active_task (which now also kills
3177    /// the shell process in addition to the foreground process) properly allows
3178    /// wait_for_exit to complete instead of hanging indefinitely.
3179    #[cfg(unix)]
3180    #[gpui::test]
3181    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3182        use std::collections::HashMap;
3183        use task::Shell;
3184        use util::shell_builder::ShellBuilder;
3185
3186        init_test(cx);
3187        cx.executor().allow_parking();
3188
3189        let fs = FakeFs::new(cx.executor());
3190        let project = Project::test(fs, [], cx).await;
3191        let connection = Rc::new(FakeAgentConnection::new());
3192        let thread = cx
3193            .update(|cx| {
3194                connection.new_session(
3195                    project.clone(),
3196                    PathList::new(&[Path::new(path!("/test"))]),
3197                    cx,
3198                )
3199            })
3200            .await
3201            .unwrap();
3202
3203        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3204
3205        // Create a real PTY terminal that runs a command which prints output then sleeps
3206        // We use printf instead of echo and chain with && sleep to ensure proper execution
3207        let (completion_tx, _completion_rx) = smol::channel::unbounded();
3208        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3209            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3210            &[],
3211        );
3212
3213        let builder = cx
3214            .update(|cx| {
3215                ::terminal::TerminalBuilder::new(
3216                    None,
3217                    None,
3218                    task::Shell::WithArguments {
3219                        program,
3220                        args,
3221                        title_override: None,
3222                    },
3223                    HashMap::default(),
3224                    ::terminal::terminal_settings::CursorShape::default(),
3225                    ::terminal::terminal_settings::AlternateScroll::On,
3226                    None,
3227                    vec![],
3228                    0,
3229                    false,
3230                    0,
3231                    Some(completion_tx),
3232                    cx,
3233                    vec![],
3234                    PathStyle::local(),
3235                )
3236            })
3237            .await
3238            .unwrap();
3239
3240        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3241
3242        // Create the acp_thread Terminal wrapper
3243        thread.update(cx, |thread, cx| {
3244            thread.on_terminal_provider_event(
3245                TerminalProviderEvent::Created {
3246                    terminal_id: terminal_id.clone(),
3247                    label: "printf output_before_kill && sleep 60".to_string(),
3248                    cwd: None,
3249                    output_byte_limit: None,
3250                    terminal: lower_terminal.clone(),
3251                },
3252                cx,
3253            );
3254        });
3255
3256        // Poll until the printf command produces output, rather than using a
3257        // fixed sleep which is flaky on loaded machines.
3258        let deadline = std::time::Instant::now() + Duration::from_secs(10);
3259        loop {
3260            let has_output = thread.read_with(cx, |thread, cx| {
3261                let term = thread
3262                    .terminals
3263                    .get(&terminal_id)
3264                    .expect("terminal not found");
3265                let content = term.read(cx).inner().read(cx).get_content();
3266                content.contains("output_before_kill")
3267            });
3268            if has_output {
3269                break;
3270            }
3271            assert!(
3272                std::time::Instant::now() < deadline,
3273                "Timed out waiting for printf output to appear in terminal",
3274            );
3275            cx.executor().timer(Duration::from_millis(50)).await;
3276        }
3277
3278        // Get the acp_thread Terminal and kill it
3279        let wait_for_exit = thread.update(cx, |thread, cx| {
3280            let term = thread.terminals.get(&terminal_id).unwrap();
3281            let wait_for_exit = term.read(cx).wait_for_exit();
3282            term.update(cx, |term, cx| {
3283                term.kill(cx);
3284            });
3285            wait_for_exit
3286        });
3287
3288        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3289        // Before the fix to kill_active_task, this would hang forever because
3290        // only the foreground process was killed, not the shell, so the PTY
3291        // child never exited and wait_for_completed_task never completed.
3292        let exit_result = futures::select! {
3293            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3294            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3295        };
3296
3297        assert!(
3298            exit_result.is_some(),
3299            "wait_for_exit should complete after kill, but it timed out. \
3300            This indicates kill_active_task is not properly killing the shell process."
3301        );
3302
3303        // Give the system a chance to process any pending updates
3304        cx.run_until_parked();
3305
3306        // Verify that the underlying terminal still has the output that was
3307        // written before the kill. This verifies that killing doesn't lose output.
3308        let inner_content = thread.read_with(cx, |thread, cx| {
3309            let term = thread.terminals.get(&terminal_id).unwrap();
3310            term.read(cx).inner().read(cx).get_content()
3311        });
3312
3313        assert!(
3314            inner_content.contains("output_before_kill"),
3315            "Underlying terminal should contain output from before kill, got: {}",
3316            inner_content
3317        );
3318    }
3319
3320    #[gpui::test]
3321    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3322        init_test(cx);
3323
3324        let fs = FakeFs::new(cx.executor());
3325        let project = Project::test(fs, [], cx).await;
3326        let connection = Rc::new(FakeAgentConnection::new());
3327        let thread = cx
3328            .update(|cx| {
3329                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3330            })
3331            .await
3332            .unwrap();
3333
3334        // Test creating a new user message
3335        thread.update(cx, |thread, cx| {
3336            thread.push_user_content_block(None, "Hello, ".into(), cx);
3337        });
3338
3339        thread.update(cx, |thread, cx| {
3340            assert_eq!(thread.entries.len(), 1);
3341            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3342                assert_eq!(user_msg.id, None);
3343                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3344            } else {
3345                panic!("Expected UserMessage");
3346            }
3347        });
3348
3349        // Test appending to existing user message
3350        let message_1_id = UserMessageId::new();
3351        thread.update(cx, |thread, cx| {
3352            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3353        });
3354
3355        thread.update(cx, |thread, cx| {
3356            assert_eq!(thread.entries.len(), 1);
3357            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3358                assert_eq!(user_msg.id, Some(message_1_id));
3359                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3360            } else {
3361                panic!("Expected UserMessage");
3362            }
3363        });
3364
3365        // Test creating new user message after assistant message
3366        thread.update(cx, |thread, cx| {
3367            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3368        });
3369
3370        let message_2_id = UserMessageId::new();
3371        thread.update(cx, |thread, cx| {
3372            thread.push_user_content_block(
3373                Some(message_2_id.clone()),
3374                "New user message".into(),
3375                cx,
3376            );
3377        });
3378
3379        thread.update(cx, |thread, cx| {
3380            assert_eq!(thread.entries.len(), 3);
3381            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3382                assert_eq!(user_msg.id, Some(message_2_id));
3383                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3384            } else {
3385                panic!("Expected UserMessage at index 2");
3386            }
3387        });
3388    }
3389
3390    #[gpui::test]
3391    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3392        init_test(cx);
3393
3394        let fs = FakeFs::new(cx.executor());
3395        let project = Project::test(fs, [], cx).await;
3396        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3397            |_, thread, mut cx| {
3398                async move {
3399                    thread.update(&mut cx, |thread, cx| {
3400                        thread
3401                            .handle_session_update(
3402                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3403                                    "Thinking ".into(),
3404                                )),
3405                                cx,
3406                            )
3407                            .unwrap();
3408                        thread
3409                            .handle_session_update(
3410                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3411                                    "hard!".into(),
3412                                )),
3413                                cx,
3414                            )
3415                            .unwrap();
3416                    })?;
3417                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3418                }
3419                .boxed_local()
3420            },
3421        ));
3422
3423        let thread = cx
3424            .update(|cx| {
3425                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3426            })
3427            .await
3428            .unwrap();
3429
3430        thread
3431            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3432            .await
3433            .unwrap();
3434
3435        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3436        assert_eq!(
3437            output,
3438            indoc! {r#"
3439            ## User
3440
3441            Hello from Zed!
3442
3443            ## Assistant
3444
3445            <thinking>
3446            Thinking hard!
3447            </thinking>
3448
3449            "#}
3450        );
3451    }
3452
3453    #[gpui::test]
3454    async fn test_ignore_echoed_user_message_chunks_during_active_turn(
3455        cx: &mut gpui::TestAppContext,
3456    ) {
3457        init_test(cx);
3458
3459        let fs = FakeFs::new(cx.executor());
3460        let project = Project::test(fs, [], cx).await;
3461        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3462            |request, thread, mut cx| {
3463                async move {
3464                    let prompt = request.prompt.first().cloned().unwrap_or_else(|| "".into());
3465
3466                    thread.update(&mut cx, |thread, cx| {
3467                        thread
3468                            .handle_session_update(
3469                                acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(
3470                                    prompt,
3471                                )),
3472                                cx,
3473                            )
3474                            .unwrap();
3475                    })?;
3476
3477                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3478                }
3479                .boxed_local()
3480            },
3481        ));
3482
3483        let thread = cx
3484            .update(|cx| {
3485                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3486            })
3487            .await
3488            .unwrap();
3489
3490        thread
3491            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3492            .await
3493            .unwrap();
3494
3495        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3496        assert_eq!(output.matches("Hello from Zed!").count(), 1);
3497    }
3498
3499    #[gpui::test]
3500    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3501        init_test(cx);
3502
3503        let fs = FakeFs::new(cx.executor());
3504        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3505            .await;
3506        let project = Project::test(fs.clone(), [], cx).await;
3507        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3508        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3509        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3510            move |_, thread, mut cx| {
3511                let read_file_tx = read_file_tx.clone();
3512                async move {
3513                    let content = thread
3514                        .update(&mut cx, |thread, cx| {
3515                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3516                        })
3517                        .unwrap()
3518                        .await
3519                        .unwrap();
3520                    assert_eq!(content, "one\ntwo\nthree\n");
3521                    read_file_tx.take().unwrap().send(()).unwrap();
3522                    thread
3523                        .update(&mut cx, |thread, cx| {
3524                            thread.write_text_file(
3525                                path!("/tmp/foo").into(),
3526                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3527                                cx,
3528                            )
3529                        })
3530                        .unwrap()
3531                        .await
3532                        .unwrap();
3533                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3534                }
3535                .boxed_local()
3536            },
3537        ));
3538
3539        let (worktree, pathbuf) = project
3540            .update(cx, |project, cx| {
3541                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3542            })
3543            .await
3544            .unwrap();
3545        let buffer = project
3546            .update(cx, |project, cx| {
3547                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3548            })
3549            .await
3550            .unwrap();
3551
3552        let thread = cx
3553            .update(|cx| {
3554                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3555            })
3556            .await
3557            .unwrap();
3558
3559        let request = thread.update(cx, |thread, cx| {
3560            thread.send_raw("Extend the count in /tmp/foo", cx)
3561        });
3562        read_file_rx.await.ok();
3563        buffer.update(cx, |buffer, cx| {
3564            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3565        });
3566        cx.run_until_parked();
3567        assert_eq!(
3568            buffer.read_with(cx, |buffer, _| buffer.text()),
3569            "zero\none\ntwo\nthree\nfour\nfive\n"
3570        );
3571        assert_eq!(
3572            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3573            "zero\none\ntwo\nthree\nfour\nfive\n"
3574        );
3575        request.await.unwrap();
3576    }
3577
3578    #[gpui::test]
3579    async fn test_reading_from_line(cx: &mut TestAppContext) {
3580        init_test(cx);
3581
3582        let fs = FakeFs::new(cx.executor());
3583        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3584            .await;
3585        let project = Project::test(fs.clone(), [], cx).await;
3586        project
3587            .update(cx, |project, cx| {
3588                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3589            })
3590            .await
3591            .unwrap();
3592
3593        let connection = Rc::new(FakeAgentConnection::new());
3594
3595        let thread = cx
3596            .update(|cx| {
3597                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3598            })
3599            .await
3600            .unwrap();
3601
3602        // Whole file
3603        let content = thread
3604            .update(cx, |thread, cx| {
3605                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3606            })
3607            .await
3608            .unwrap();
3609
3610        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3611
3612        // Only start line
3613        let content = thread
3614            .update(cx, |thread, cx| {
3615                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3616            })
3617            .await
3618            .unwrap();
3619
3620        assert_eq!(content, "three\nfour\n");
3621
3622        // Only limit
3623        let content = thread
3624            .update(cx, |thread, cx| {
3625                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3626            })
3627            .await
3628            .unwrap();
3629
3630        assert_eq!(content, "one\ntwo\n");
3631
3632        // Range
3633        let content = thread
3634            .update(cx, |thread, cx| {
3635                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3636            })
3637            .await
3638            .unwrap();
3639
3640        assert_eq!(content, "two\nthree\n");
3641
3642        // Invalid
3643        let err = thread
3644            .update(cx, |thread, cx| {
3645                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3646            })
3647            .await
3648            .unwrap_err();
3649
3650        assert_eq!(
3651            err.to_string(),
3652            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3653        );
3654    }
3655
3656    #[gpui::test]
3657    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3658        init_test(cx);
3659
3660        let fs = FakeFs::new(cx.executor());
3661        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3662        let project = Project::test(fs.clone(), [], cx).await;
3663        project
3664            .update(cx, |project, cx| {
3665                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3666            })
3667            .await
3668            .unwrap();
3669
3670        let connection = Rc::new(FakeAgentConnection::new());
3671
3672        let thread = cx
3673            .update(|cx| {
3674                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3675            })
3676            .await
3677            .unwrap();
3678
3679        // Whole file
3680        let content = thread
3681            .update(cx, |thread, cx| {
3682                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3683            })
3684            .await
3685            .unwrap();
3686
3687        assert_eq!(content, "");
3688
3689        // Only start line
3690        let content = thread
3691            .update(cx, |thread, cx| {
3692                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3693            })
3694            .await
3695            .unwrap();
3696
3697        assert_eq!(content, "");
3698
3699        // Only limit
3700        let content = thread
3701            .update(cx, |thread, cx| {
3702                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3703            })
3704            .await
3705            .unwrap();
3706
3707        assert_eq!(content, "");
3708
3709        // Range
3710        let content = thread
3711            .update(cx, |thread, cx| {
3712                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3713            })
3714            .await
3715            .unwrap();
3716
3717        assert_eq!(content, "");
3718
3719        // Invalid
3720        let err = thread
3721            .update(cx, |thread, cx| {
3722                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3723            })
3724            .await
3725            .unwrap_err();
3726
3727        assert_eq!(
3728            err.to_string(),
3729            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3730        );
3731    }
3732    #[gpui::test]
3733    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3734        init_test(cx);
3735
3736        let fs = FakeFs::new(cx.executor());
3737        fs.insert_tree(path!("/tmp"), json!({})).await;
3738        let project = Project::test(fs.clone(), [], cx).await;
3739        project
3740            .update(cx, |project, cx| {
3741                project.find_or_create_worktree(path!("/tmp"), true, cx)
3742            })
3743            .await
3744            .unwrap();
3745
3746        let connection = Rc::new(FakeAgentConnection::new());
3747
3748        let thread = cx
3749            .update(|cx| {
3750                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3751            })
3752            .await
3753            .unwrap();
3754
3755        // Out of project file
3756        let err = thread
3757            .update(cx, |thread, cx| {
3758                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3759            })
3760            .await
3761            .unwrap_err();
3762
3763        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3764    }
3765
3766    #[gpui::test]
3767    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3768        init_test(cx);
3769
3770        let fs = FakeFs::new(cx.executor());
3771        let project = Project::test(fs, [], cx).await;
3772        let id = acp::ToolCallId::new("test");
3773
3774        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3775            let id = id.clone();
3776            move |_, thread, mut cx| {
3777                let id = id.clone();
3778                async move {
3779                    thread
3780                        .update(&mut cx, |thread, cx| {
3781                            thread.handle_session_update(
3782                                acp::SessionUpdate::ToolCall(
3783                                    acp::ToolCall::new(id.clone(), "Label")
3784                                        .kind(acp::ToolKind::Fetch)
3785                                        .status(acp::ToolCallStatus::InProgress),
3786                                ),
3787                                cx,
3788                            )
3789                        })
3790                        .unwrap()
3791                        .unwrap();
3792                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3793                }
3794                .boxed_local()
3795            }
3796        }));
3797
3798        let thread = cx
3799            .update(|cx| {
3800                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3801            })
3802            .await
3803            .unwrap();
3804
3805        let request = thread.update(cx, |thread, cx| {
3806            thread.send_raw("Fetch https://example.com", cx)
3807        });
3808
3809        run_until_first_tool_call(&thread, cx).await;
3810
3811        thread.read_with(cx, |thread, _| {
3812            assert!(matches!(
3813                thread.entries[1],
3814                AgentThreadEntry::ToolCall(ToolCall {
3815                    status: ToolCallStatus::InProgress,
3816                    ..
3817                })
3818            ));
3819        });
3820
3821        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3822
3823        thread.read_with(cx, |thread, _| {
3824            assert!(matches!(
3825                &thread.entries[1],
3826                AgentThreadEntry::ToolCall(ToolCall {
3827                    status: ToolCallStatus::Canceled,
3828                    ..
3829                })
3830            ));
3831        });
3832
3833        thread
3834            .update(cx, |thread, cx| {
3835                thread.handle_session_update(
3836                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3837                        id,
3838                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3839                    )),
3840                    cx,
3841                )
3842            })
3843            .unwrap();
3844
3845        request.await.unwrap();
3846
3847        thread.read_with(cx, |thread, _| {
3848            assert!(matches!(
3849                thread.entries[1],
3850                AgentThreadEntry::ToolCall(ToolCall {
3851                    status: ToolCallStatus::Completed,
3852                    ..
3853                })
3854            ));
3855        });
3856    }
3857
3858    #[gpui::test]
3859    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3860        init_test(cx);
3861        let fs = FakeFs::new(cx.background_executor.clone());
3862        fs.insert_tree(path!("/test"), json!({})).await;
3863        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3864
3865        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3866            move |_, thread, mut cx| {
3867                async move {
3868                    thread
3869                        .update(&mut cx, |thread, cx| {
3870                            thread.handle_session_update(
3871                                acp::SessionUpdate::ToolCall(
3872                                    acp::ToolCall::new("test", "Label")
3873                                        .kind(acp::ToolKind::Edit)
3874                                        .status(acp::ToolCallStatus::Completed)
3875                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3876                                            "/test/test.txt",
3877                                            "foo",
3878                                        ))]),
3879                                ),
3880                                cx,
3881                            )
3882                        })
3883                        .unwrap()
3884                        .unwrap();
3885                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3886                }
3887                .boxed_local()
3888            }
3889        }));
3890
3891        let thread = cx
3892            .update(|cx| {
3893                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3894            })
3895            .await
3896            .unwrap();
3897
3898        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3899            .await
3900            .unwrap();
3901
3902        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3903    }
3904
3905    #[gpui::test(iterations = 10)]
3906    async fn test_checkpoints(cx: &mut TestAppContext) {
3907        init_test(cx);
3908        let fs = FakeFs::new(cx.background_executor.clone());
3909        fs.insert_tree(
3910            path!("/test"),
3911            json!({
3912                ".git": {}
3913            }),
3914        )
3915        .await;
3916        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3917
3918        let simulate_changes = Arc::new(AtomicBool::new(true));
3919        let next_filename = Arc::new(AtomicUsize::new(0));
3920        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3921            let simulate_changes = simulate_changes.clone();
3922            let next_filename = next_filename.clone();
3923            let fs = fs.clone();
3924            move |request, thread, mut cx| {
3925                let fs = fs.clone();
3926                let simulate_changes = simulate_changes.clone();
3927                let next_filename = next_filename.clone();
3928                async move {
3929                    if simulate_changes.load(SeqCst) {
3930                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3931                        fs.write(Path::new(&filename), b"").await?;
3932                    }
3933
3934                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3935                        panic!("expected text content block");
3936                    };
3937                    thread.update(&mut cx, |thread, cx| {
3938                        thread
3939                            .handle_session_update(
3940                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3941                                    content.text.to_uppercase().into(),
3942                                )),
3943                                cx,
3944                            )
3945                            .unwrap();
3946                    })?;
3947                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3948                }
3949                .boxed_local()
3950            }
3951        }));
3952        let thread = cx
3953            .update(|cx| {
3954                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3955            })
3956            .await
3957            .unwrap();
3958
3959        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3960            .await
3961            .unwrap();
3962        thread.read_with(cx, |thread, cx| {
3963            assert_eq!(
3964                thread.to_markdown(cx),
3965                indoc! {"
3966                    ## User (checkpoint)
3967
3968                    Lorem
3969
3970                    ## Assistant
3971
3972                    LOREM
3973
3974                "}
3975            );
3976        });
3977        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3978
3979        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3980            .await
3981            .unwrap();
3982        thread.read_with(cx, |thread, cx| {
3983            assert_eq!(
3984                thread.to_markdown(cx),
3985                indoc! {"
3986                    ## User (checkpoint)
3987
3988                    Lorem
3989
3990                    ## Assistant
3991
3992                    LOREM
3993
3994                    ## User (checkpoint)
3995
3996                    ipsum
3997
3998                    ## Assistant
3999
4000                    IPSUM
4001
4002                "}
4003            );
4004        });
4005        assert_eq!(
4006            fs.files(),
4007            vec![
4008                Path::new(path!("/test/file-0")),
4009                Path::new(path!("/test/file-1"))
4010            ]
4011        );
4012
4013        // Checkpoint isn't stored when there are no changes.
4014        simulate_changes.store(false, SeqCst);
4015        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
4016            .await
4017            .unwrap();
4018        thread.read_with(cx, |thread, cx| {
4019            assert_eq!(
4020                thread.to_markdown(cx),
4021                indoc! {"
4022                    ## User (checkpoint)
4023
4024                    Lorem
4025
4026                    ## Assistant
4027
4028                    LOREM
4029
4030                    ## User (checkpoint)
4031
4032                    ipsum
4033
4034                    ## Assistant
4035
4036                    IPSUM
4037
4038                    ## User
4039
4040                    dolor
4041
4042                    ## Assistant
4043
4044                    DOLOR
4045
4046                "}
4047            );
4048        });
4049        assert_eq!(
4050            fs.files(),
4051            vec![
4052                Path::new(path!("/test/file-0")),
4053                Path::new(path!("/test/file-1"))
4054            ]
4055        );
4056
4057        // Rewinding the conversation truncates the history and restores the checkpoint.
4058        thread
4059            .update(cx, |thread, cx| {
4060                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
4061                    panic!("unexpected entries {:?}", thread.entries)
4062                };
4063                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
4064            })
4065            .await
4066            .unwrap();
4067        thread.read_with(cx, |thread, cx| {
4068            assert_eq!(
4069                thread.to_markdown(cx),
4070                indoc! {"
4071                    ## User (checkpoint)
4072
4073                    Lorem
4074
4075                    ## Assistant
4076
4077                    LOREM
4078
4079                "}
4080            );
4081        });
4082        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4083    }
4084
4085    #[gpui::test]
4086    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4087        use std::sync::atomic::AtomicUsize;
4088        init_test(cx);
4089
4090        let fs = FakeFs::new(cx.executor());
4091        let project = Project::test(fs, None, cx).await;
4092
4093        // Create a connection that simulates refusal after tool result
4094        let prompt_count = Arc::new(AtomicUsize::new(0));
4095        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4096            let prompt_count = prompt_count.clone();
4097            move |_request, thread, mut cx| {
4098                let count = prompt_count.fetch_add(1, SeqCst);
4099                async move {
4100                    if count == 0 {
4101                        // First prompt: Generate a tool call with result
4102                        thread.update(&mut cx, |thread, cx| {
4103                            thread
4104                                .handle_session_update(
4105                                    acp::SessionUpdate::ToolCall(
4106                                        acp::ToolCall::new("tool1", "Test Tool")
4107                                            .kind(acp::ToolKind::Fetch)
4108                                            .status(acp::ToolCallStatus::Completed)
4109                                            .raw_input(serde_json::json!({"query": "test"}))
4110                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
4111                                    ),
4112                                    cx,
4113                                )
4114                                .unwrap();
4115                        })?;
4116
4117                        // Now return refusal because of the tool result
4118                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4119                    } else {
4120                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4121                    }
4122                }
4123                .boxed_local()
4124            }
4125        }));
4126
4127        let thread = cx
4128            .update(|cx| {
4129                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4130            })
4131            .await
4132            .unwrap();
4133
4134        // Track if we see a Refusal event
4135        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4136        let saw_refusal_event_captured = saw_refusal_event.clone();
4137        thread.update(cx, |_thread, cx| {
4138            cx.subscribe(
4139                &thread,
4140                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4141                    if matches!(event, AcpThreadEvent::Refusal) {
4142                        *saw_refusal_event_captured.lock().unwrap() = true;
4143                    }
4144                },
4145            )
4146            .detach();
4147        });
4148
4149        // Send a user message - this will trigger tool call and then refusal
4150        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4151        cx.background_executor.spawn(send_task).detach();
4152        cx.run_until_parked();
4153
4154        // Verify that:
4155        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4156        // 2. The user message was NOT truncated
4157        assert!(
4158            *saw_refusal_event.lock().unwrap(),
4159            "Refusal event should be emitted for tool result refusals"
4160        );
4161
4162        thread.read_with(cx, |thread, _| {
4163            let entries = thread.entries();
4164            assert!(entries.len() >= 2, "Should have user message and tool call");
4165
4166            // Verify user message is still there
4167            assert!(
4168                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4169                "User message should not be truncated"
4170            );
4171
4172            // Verify tool call is there with result
4173            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4174                assert!(
4175                    tool_call.raw_output.is_some(),
4176                    "Tool call should have output"
4177                );
4178            } else {
4179                panic!("Expected tool call at index 1");
4180            }
4181        });
4182    }
4183
4184    #[gpui::test]
4185    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4186        init_test(cx);
4187
4188        let fs = FakeFs::new(cx.executor());
4189        let project = Project::test(fs, None, cx).await;
4190
4191        let refuse_next = Arc::new(AtomicBool::new(false));
4192        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4193            let refuse_next = refuse_next.clone();
4194            move |_request, _thread, _cx| {
4195                if refuse_next.load(SeqCst) {
4196                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4197                        .boxed_local()
4198                } else {
4199                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4200                        .boxed_local()
4201                }
4202            }
4203        }));
4204
4205        let thread = cx
4206            .update(|cx| {
4207                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4208            })
4209            .await
4210            .unwrap();
4211
4212        // Track if we see a Refusal event
4213        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4214        let saw_refusal_event_captured = saw_refusal_event.clone();
4215        thread.update(cx, |_thread, cx| {
4216            cx.subscribe(
4217                &thread,
4218                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4219                    if matches!(event, AcpThreadEvent::Refusal) {
4220                        *saw_refusal_event_captured.lock().unwrap() = true;
4221                    }
4222                },
4223            )
4224            .detach();
4225        });
4226
4227        // Send a message that will be refused
4228        refuse_next.store(true, SeqCst);
4229        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4230            .await
4231            .unwrap();
4232
4233        // Verify that a Refusal event WAS emitted for user prompt refusal
4234        assert!(
4235            *saw_refusal_event.lock().unwrap(),
4236            "Refusal event should be emitted for user prompt refusals"
4237        );
4238
4239        // Verify the message was truncated (user prompt refusal)
4240        thread.read_with(cx, |thread, cx| {
4241            assert_eq!(thread.to_markdown(cx), "");
4242        });
4243    }
4244
4245    #[gpui::test]
4246    async fn test_refusal(cx: &mut TestAppContext) {
4247        init_test(cx);
4248        let fs = FakeFs::new(cx.background_executor.clone());
4249        fs.insert_tree(path!("/"), json!({})).await;
4250        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4251
4252        let refuse_next = Arc::new(AtomicBool::new(false));
4253        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4254            let refuse_next = refuse_next.clone();
4255            move |request, thread, mut cx| {
4256                let refuse_next = refuse_next.clone();
4257                async move {
4258                    if refuse_next.load(SeqCst) {
4259                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4260                    }
4261
4262                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4263                        panic!("expected text content block");
4264                    };
4265                    thread.update(&mut cx, |thread, cx| {
4266                        thread
4267                            .handle_session_update(
4268                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4269                                    content.text.to_uppercase().into(),
4270                                )),
4271                                cx,
4272                            )
4273                            .unwrap();
4274                    })?;
4275                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4276                }
4277                .boxed_local()
4278            }
4279        }));
4280        let thread = cx
4281            .update(|cx| {
4282                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4283            })
4284            .await
4285            .unwrap();
4286
4287        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4288            .await
4289            .unwrap();
4290        thread.read_with(cx, |thread, cx| {
4291            assert_eq!(
4292                thread.to_markdown(cx),
4293                indoc! {"
4294                    ## User
4295
4296                    hello
4297
4298                    ## Assistant
4299
4300                    HELLO
4301
4302                "}
4303            );
4304        });
4305
4306        // Simulate refusing the second message. The message should be truncated
4307        // when a user prompt is refused.
4308        refuse_next.store(true, SeqCst);
4309        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4310            .await
4311            .unwrap();
4312        thread.read_with(cx, |thread, cx| {
4313            assert_eq!(
4314                thread.to_markdown(cx),
4315                indoc! {"
4316                    ## User
4317
4318                    hello
4319
4320                    ## Assistant
4321
4322                    HELLO
4323
4324                "}
4325            );
4326        });
4327    }
4328
4329    async fn run_until_first_tool_call(
4330        thread: &Entity<AcpThread>,
4331        cx: &mut TestAppContext,
4332    ) -> usize {
4333        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4334
4335        let subscription = cx.update(|cx| {
4336            cx.subscribe(thread, move |thread, _, cx| {
4337                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4338                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4339                        return tx.try_send(ix).unwrap();
4340                    }
4341                }
4342            })
4343        });
4344
4345        select! {
4346            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4347                panic!("Timeout waiting for tool call")
4348            }
4349            ix = rx.next().fuse() => {
4350                drop(subscription);
4351                ix.unwrap()
4352            }
4353        }
4354    }
4355
4356    #[derive(Clone, Default)]
4357    struct FakeAgentConnection {
4358        auth_methods: Vec<acp::AuthMethod>,
4359        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4360        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4361        on_user_message: Option<
4362            Rc<
4363                dyn Fn(
4364                        acp::PromptRequest,
4365                        WeakEntity<AcpThread>,
4366                        AsyncApp,
4367                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4368                    + 'static,
4369            >,
4370        >,
4371    }
4372
4373    impl FakeAgentConnection {
4374        fn new() -> Self {
4375            Self {
4376                auth_methods: Vec::new(),
4377                on_user_message: None,
4378                sessions: Arc::default(),
4379                set_title_calls: Default::default(),
4380            }
4381        }
4382
4383        #[expect(unused)]
4384        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4385            self.auth_methods = auth_methods;
4386            self
4387        }
4388
4389        fn on_user_message(
4390            mut self,
4391            handler: impl Fn(
4392                acp::PromptRequest,
4393                WeakEntity<AcpThread>,
4394                AsyncApp,
4395            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4396            + 'static,
4397        ) -> Self {
4398            self.on_user_message.replace(Rc::new(handler));
4399            self
4400        }
4401    }
4402
4403    impl AgentConnection for FakeAgentConnection {
4404        fn agent_id(&self) -> AgentId {
4405            AgentId::new("fake")
4406        }
4407
4408        fn telemetry_id(&self) -> SharedString {
4409            "fake".into()
4410        }
4411
4412        fn auth_methods(&self) -> &[acp::AuthMethod] {
4413            &self.auth_methods
4414        }
4415
4416        fn new_session(
4417            self: Rc<Self>,
4418            project: Entity<Project>,
4419            work_dirs: PathList,
4420            cx: &mut App,
4421        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4422            let session_id = acp::SessionId::new(
4423                rand::rng()
4424                    .sample_iter(&distr::Alphanumeric)
4425                    .take(7)
4426                    .map(char::from)
4427                    .collect::<String>(),
4428            );
4429            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4430            let thread = cx.new(|cx| {
4431                AcpThread::new(
4432                    None,
4433                    None,
4434                    Some(work_dirs),
4435                    self.clone(),
4436                    project,
4437                    action_log,
4438                    session_id.clone(),
4439                    watch::Receiver::constant(
4440                        acp::PromptCapabilities::new()
4441                            .image(true)
4442                            .audio(true)
4443                            .embedded_context(true),
4444                    ),
4445                    cx,
4446                )
4447            });
4448            self.sessions.lock().insert(session_id, thread.downgrade());
4449            Task::ready(Ok(thread))
4450        }
4451
4452        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4453            if self.auth_methods().iter().any(|m| m.id() == &method) {
4454                Task::ready(Ok(()))
4455            } else {
4456                Task::ready(Err(anyhow!("Invalid Auth Method")))
4457            }
4458        }
4459
4460        fn prompt(
4461            &self,
4462            _id: Option<UserMessageId>,
4463            params: acp::PromptRequest,
4464            cx: &mut App,
4465        ) -> Task<gpui::Result<acp::PromptResponse>> {
4466            let sessions = self.sessions.lock();
4467            let thread = sessions.get(&params.session_id).unwrap();
4468            if let Some(handler) = &self.on_user_message {
4469                let handler = handler.clone();
4470                let thread = thread.clone();
4471                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4472            } else {
4473                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4474            }
4475        }
4476
4477        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4478
4479        fn truncate(
4480            &self,
4481            session_id: &acp::SessionId,
4482            _cx: &App,
4483        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4484            Some(Rc::new(FakeAgentSessionEditor {
4485                _session_id: session_id.clone(),
4486            }))
4487        }
4488
4489        fn set_title(
4490            &self,
4491            _session_id: &acp::SessionId,
4492            _cx: &App,
4493        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4494            Some(Rc::new(FakeAgentSessionSetTitle {
4495                calls: self.set_title_calls.clone(),
4496            }))
4497        }
4498
4499        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4500            self
4501        }
4502    }
4503
4504    struct FakeAgentSessionSetTitle {
4505        calls: Rc<RefCell<Vec<SharedString>>>,
4506    }
4507
4508    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4509        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4510            self.calls.borrow_mut().push(title);
4511            Task::ready(Ok(()))
4512        }
4513    }
4514
4515    struct FakeAgentSessionEditor {
4516        _session_id: acp::SessionId,
4517    }
4518
4519    impl AgentSessionTruncate for FakeAgentSessionEditor {
4520        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4521            Task::ready(Ok(()))
4522        }
4523    }
4524
4525    #[gpui::test]
4526    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4527        init_test(cx);
4528
4529        let fs = FakeFs::new(cx.executor());
4530        let project = Project::test(fs, [], cx).await;
4531        let connection = Rc::new(FakeAgentConnection::new());
4532        let thread = cx
4533            .update(|cx| {
4534                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4535            })
4536            .await
4537            .unwrap();
4538
4539        // Try to update a tool call that doesn't exist
4540        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4541        thread.update(cx, |thread, cx| {
4542            let result = thread.handle_session_update(
4543                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4544                    nonexistent_id.clone(),
4545                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4546                )),
4547                cx,
4548            );
4549
4550            // The update should succeed (not return an error)
4551            assert!(result.is_ok());
4552
4553            // There should now be exactly one entry in the thread
4554            assert_eq!(thread.entries.len(), 1);
4555
4556            // The entry should be a failed tool call
4557            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4558                assert_eq!(tool_call.id, nonexistent_id);
4559                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4560                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4561
4562                // Check that the content contains the error message
4563                assert_eq!(tool_call.content.len(), 1);
4564                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4565                    match content_block {
4566                        ContentBlock::Markdown { markdown } => {
4567                            let markdown_text = markdown.read(cx).source();
4568                            assert!(markdown_text.contains("Tool call not found"));
4569                        }
4570                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4571                        ContentBlock::ResourceLink { .. } => {
4572                            panic!("Expected markdown content, got resource link")
4573                        }
4574                        ContentBlock::Image { .. } => {
4575                            panic!("Expected markdown content, got image")
4576                        }
4577                    }
4578                } else {
4579                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4580                }
4581            } else {
4582                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4583            }
4584        });
4585    }
4586
4587    /// Tests that restoring a checkpoint properly cleans up terminals that were
4588    /// created after that checkpoint, and cancels any in-progress generation.
4589    ///
4590    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4591    /// that were started after that checkpoint should be terminated, and any in-progress
4592    /// AI generation should be canceled.
4593    #[gpui::test]
4594    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4595        init_test(cx);
4596
4597        let fs = FakeFs::new(cx.executor());
4598        let project = Project::test(fs, [], cx).await;
4599        let connection = Rc::new(FakeAgentConnection::new());
4600        let thread = cx
4601            .update(|cx| {
4602                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4603            })
4604            .await
4605            .unwrap();
4606
4607        // Send first user message to create a checkpoint
4608        cx.update(|cx| {
4609            thread.update(cx, |thread, cx| {
4610                thread.send(vec!["first message".into()], cx)
4611            })
4612        })
4613        .await
4614        .unwrap();
4615
4616        // Send second message (creates another checkpoint) - we'll restore to this one
4617        cx.update(|cx| {
4618            thread.update(cx, |thread, cx| {
4619                thread.send(vec!["second message".into()], cx)
4620            })
4621        })
4622        .await
4623        .unwrap();
4624
4625        // Create 2 terminals BEFORE the checkpoint that have completed running
4626        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4627        let mock_terminal_1 = cx.new(|cx| {
4628            let builder = ::terminal::TerminalBuilder::new_display_only(
4629                ::terminal::terminal_settings::CursorShape::default(),
4630                ::terminal::terminal_settings::AlternateScroll::On,
4631                None,
4632                0,
4633                cx.background_executor(),
4634                PathStyle::local(),
4635            )
4636            .unwrap();
4637            builder.subscribe(cx)
4638        });
4639
4640        thread.update(cx, |thread, cx| {
4641            thread.on_terminal_provider_event(
4642                TerminalProviderEvent::Created {
4643                    terminal_id: terminal_id_1.clone(),
4644                    label: "echo 'first'".to_string(),
4645                    cwd: Some(PathBuf::from("/test")),
4646                    output_byte_limit: None,
4647                    terminal: mock_terminal_1.clone(),
4648                },
4649                cx,
4650            );
4651        });
4652
4653        thread.update(cx, |thread, cx| {
4654            thread.on_terminal_provider_event(
4655                TerminalProviderEvent::Output {
4656                    terminal_id: terminal_id_1.clone(),
4657                    data: b"first\n".to_vec(),
4658                },
4659                cx,
4660            );
4661        });
4662
4663        thread.update(cx, |thread, cx| {
4664            thread.on_terminal_provider_event(
4665                TerminalProviderEvent::Exit {
4666                    terminal_id: terminal_id_1.clone(),
4667                    status: acp::TerminalExitStatus::new().exit_code(0),
4668                },
4669                cx,
4670            );
4671        });
4672
4673        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4674        let mock_terminal_2 = cx.new(|cx| {
4675            let builder = ::terminal::TerminalBuilder::new_display_only(
4676                ::terminal::terminal_settings::CursorShape::default(),
4677                ::terminal::terminal_settings::AlternateScroll::On,
4678                None,
4679                0,
4680                cx.background_executor(),
4681                PathStyle::local(),
4682            )
4683            .unwrap();
4684            builder.subscribe(cx)
4685        });
4686
4687        thread.update(cx, |thread, cx| {
4688            thread.on_terminal_provider_event(
4689                TerminalProviderEvent::Created {
4690                    terminal_id: terminal_id_2.clone(),
4691                    label: "echo 'second'".to_string(),
4692                    cwd: Some(PathBuf::from("/test")),
4693                    output_byte_limit: None,
4694                    terminal: mock_terminal_2.clone(),
4695                },
4696                cx,
4697            );
4698        });
4699
4700        thread.update(cx, |thread, cx| {
4701            thread.on_terminal_provider_event(
4702                TerminalProviderEvent::Output {
4703                    terminal_id: terminal_id_2.clone(),
4704                    data: b"second\n".to_vec(),
4705                },
4706                cx,
4707            );
4708        });
4709
4710        thread.update(cx, |thread, cx| {
4711            thread.on_terminal_provider_event(
4712                TerminalProviderEvent::Exit {
4713                    terminal_id: terminal_id_2.clone(),
4714                    status: acp::TerminalExitStatus::new().exit_code(0),
4715                },
4716                cx,
4717            );
4718        });
4719
4720        // Get the second message ID to restore to
4721        let second_message_id = thread.read_with(cx, |thread, _| {
4722            // At this point we have:
4723            // - Index 0: First user message (with checkpoint)
4724            // - Index 1: Second user message (with checkpoint)
4725            // No assistant responses because FakeAgentConnection just returns EndTurn
4726            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4727                panic!("expected user message at index 1");
4728            };
4729            message.id.clone().unwrap()
4730        });
4731
4732        // Create a terminal AFTER the checkpoint we'll restore to.
4733        // This simulates the AI agent starting a long-running terminal command.
4734        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4735        let mock_terminal = cx.new(|cx| {
4736            let builder = ::terminal::TerminalBuilder::new_display_only(
4737                ::terminal::terminal_settings::CursorShape::default(),
4738                ::terminal::terminal_settings::AlternateScroll::On,
4739                None,
4740                0,
4741                cx.background_executor(),
4742                PathStyle::local(),
4743            )
4744            .unwrap();
4745            builder.subscribe(cx)
4746        });
4747
4748        // Register the terminal as created
4749        thread.update(cx, |thread, cx| {
4750            thread.on_terminal_provider_event(
4751                TerminalProviderEvent::Created {
4752                    terminal_id: terminal_id.clone(),
4753                    label: "sleep 1000".to_string(),
4754                    cwd: Some(PathBuf::from("/test")),
4755                    output_byte_limit: None,
4756                    terminal: mock_terminal.clone(),
4757                },
4758                cx,
4759            );
4760        });
4761
4762        // Simulate the terminal producing output (still running)
4763        thread.update(cx, |thread, cx| {
4764            thread.on_terminal_provider_event(
4765                TerminalProviderEvent::Output {
4766                    terminal_id: terminal_id.clone(),
4767                    data: b"terminal is running...\n".to_vec(),
4768                },
4769                cx,
4770            );
4771        });
4772
4773        // Create a tool call entry that references this terminal
4774        // This represents the agent requesting a terminal command
4775        thread.update(cx, |thread, cx| {
4776            thread
4777                .handle_session_update(
4778                    acp::SessionUpdate::ToolCall(
4779                        acp::ToolCall::new("terminal-tool-1", "Running command")
4780                            .kind(acp::ToolKind::Execute)
4781                            .status(acp::ToolCallStatus::InProgress)
4782                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4783                                terminal_id.clone(),
4784                            ))])
4785                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4786                    ),
4787                    cx,
4788                )
4789                .unwrap();
4790        });
4791
4792        // Verify terminal exists and is in the thread
4793        let terminal_exists_before =
4794            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4795        assert!(
4796            terminal_exists_before,
4797            "Terminal should exist before checkpoint restore"
4798        );
4799
4800        // Verify the terminal's underlying task is still running (not completed)
4801        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4802            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4803            terminal_entity.read_with(cx, |term, _cx| {
4804                term.output().is_none() // output is None means it's still running
4805            })
4806        });
4807        assert!(
4808            terminal_running_before,
4809            "Terminal should be running before checkpoint restore"
4810        );
4811
4812        // Verify we have the expected entries before restore
4813        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4814        assert!(
4815            entry_count_before > 1,
4816            "Should have multiple entries before restore"
4817        );
4818
4819        // Restore the checkpoint to the second message.
4820        // This should:
4821        // 1. Cancel any in-progress generation (via the cancel() call)
4822        // 2. Remove the terminal that was created after that point
4823        thread
4824            .update(cx, |thread, cx| {
4825                thread.restore_checkpoint(second_message_id, cx)
4826            })
4827            .await
4828            .unwrap();
4829
4830        // Verify that no send_task is in progress after restore
4831        // (cancel() clears the send_task)
4832        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4833        assert!(
4834            !has_send_task_after,
4835            "Should not have a send_task after restore (cancel should have cleared it)"
4836        );
4837
4838        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4839        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4840        assert_eq!(
4841            entry_count, 1,
4842            "Should have 1 entry after restore (only the first user message)"
4843        );
4844
4845        // Verify the 2 completed terminals from before the checkpoint still exist
4846        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4847            thread.terminals.contains_key(&terminal_id_1)
4848        });
4849        assert!(
4850            terminal_1_exists,
4851            "Terminal 1 (from before checkpoint) should still exist"
4852        );
4853
4854        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4855            thread.terminals.contains_key(&terminal_id_2)
4856        });
4857        assert!(
4858            terminal_2_exists,
4859            "Terminal 2 (from before checkpoint) should still exist"
4860        );
4861
4862        // Verify they're still in completed state
4863        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4864            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4865            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4866        });
4867        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4868
4869        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4870            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4871            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4872        });
4873        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4874
4875        // Verify the running terminal (created after checkpoint) was removed
4876        let terminal_3_exists =
4877            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4878        assert!(
4879            !terminal_3_exists,
4880            "Terminal 3 (created after checkpoint) should have been removed"
4881        );
4882
4883        // Verify total count is 2 (the two from before the checkpoint)
4884        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4885        assert_eq!(
4886            terminal_count, 2,
4887            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4888        );
4889    }
4890
4891    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4892    /// even when a new user message is added while the async checkpoint comparison is in progress.
4893    ///
4894    /// This is a regression test for a bug where update_last_checkpoint would fail with
4895    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4896    /// update_last_checkpoint started and when its async closure ran.
4897    #[gpui::test]
4898    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4899        init_test(cx);
4900
4901        let fs = FakeFs::new(cx.executor());
4902        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4903            .await;
4904        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4905
4906        let handler_done = Arc::new(AtomicBool::new(false));
4907        let handler_done_clone = handler_done.clone();
4908        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4909            move |_, _thread, _cx| {
4910                handler_done_clone.store(true, SeqCst);
4911                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4912            },
4913        ));
4914
4915        let thread = cx
4916            .update(|cx| {
4917                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4918            })
4919            .await
4920            .unwrap();
4921
4922        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4923        let send_task = cx.background_executor.spawn(send_future);
4924
4925        // Tick until handler completes, then a few more to let update_last_checkpoint start
4926        while !handler_done.load(SeqCst) {
4927            cx.executor().tick();
4928        }
4929        for _ in 0..5 {
4930            cx.executor().tick();
4931        }
4932
4933        thread.update(cx, |thread, cx| {
4934            thread.push_entry(
4935                AgentThreadEntry::UserMessage(UserMessage {
4936                    id: Some(UserMessageId::new()),
4937                    content: ContentBlock::Empty,
4938                    chunks: vec!["Injected message (no checkpoint)".into()],
4939                    checkpoint: None,
4940                    indented: false,
4941                }),
4942                cx,
4943            );
4944        });
4945
4946        cx.run_until_parked();
4947        let result = send_task.await;
4948
4949        assert!(
4950            result.is_ok(),
4951            "send should succeed even when new message added during update_last_checkpoint: {:?}",
4952            result.err()
4953        );
4954    }
4955
4956    /// Tests that when a follow-up message is sent during generation,
4957    /// the first turn completing does NOT clear `running_turn` because
4958    /// it now belongs to the second turn.
4959    #[gpui::test]
4960    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4961        init_test(cx);
4962
4963        let fs = FakeFs::new(cx.executor());
4964        let project = Project::test(fs, [], cx).await;
4965
4966        // First handler waits for this signal before completing
4967        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4968        let first_complete_rx = RefCell::new(Some(first_complete_rx));
4969
4970        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4971            move |params, _thread, _cx| {
4972                let first_complete_rx = first_complete_rx.borrow_mut().take();
4973                let is_first = params
4974                    .prompt
4975                    .iter()
4976                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4977
4978                async move {
4979                    if is_first {
4980                        // First handler waits until signaled
4981                        if let Some(rx) = first_complete_rx {
4982                            rx.await.ok();
4983                        }
4984                    }
4985                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4986                }
4987                .boxed_local()
4988            }
4989        }));
4990
4991        let thread = cx
4992            .update(|cx| {
4993                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4994            })
4995            .await
4996            .unwrap();
4997
4998        // Send first message (turn_id=1) - handler will block
4999        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
5000        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
5001
5002        // Send second message (turn_id=2) while first is still blocked
5003        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
5004        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
5005        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
5006
5007        let running_turn_after_second_send =
5008            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5009        assert_eq!(
5010            running_turn_after_second_send,
5011            Some(2),
5012            "running_turn should be set to turn 2 after sending second message"
5013        );
5014
5015        // Now signal first handler to complete
5016        first_complete_tx.send(()).ok();
5017
5018        // First request completes - should NOT clear running_turn
5019        // because running_turn now belongs to turn 2
5020        first_request.await.unwrap();
5021
5022        let running_turn_after_first =
5023            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5024        assert_eq!(
5025            running_turn_after_first,
5026            Some(2),
5027            "first turn completing should not clear running_turn (belongs to turn 2)"
5028        );
5029
5030        // Second request completes - SHOULD clear running_turn
5031        second_request.await.unwrap();
5032
5033        let running_turn_after_second =
5034            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
5035        assert!(
5036            !running_turn_after_second,
5037            "second turn completing should clear running_turn"
5038        );
5039    }
5040
5041    #[gpui::test]
5042    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
5043        cx: &mut TestAppContext,
5044    ) {
5045        init_test(cx);
5046
5047        let fs = FakeFs::new(cx.executor());
5048        let project = Project::test(fs, [], cx).await;
5049
5050        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5051            move |_params, thread, mut cx| {
5052                async move {
5053                    thread
5054                        .update(&mut cx, |thread, cx| {
5055                            thread.handle_session_update(
5056                                acp::SessionUpdate::ToolCall(
5057                                    acp::ToolCall::new(
5058                                        acp::ToolCallId::new("test-tool"),
5059                                        "Test Tool",
5060                                    )
5061                                    .kind(acp::ToolKind::Fetch)
5062                                    .status(acp::ToolCallStatus::InProgress),
5063                                ),
5064                                cx,
5065                            )
5066                        })
5067                        .unwrap()
5068                        .unwrap();
5069
5070                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5071                }
5072                .boxed_local()
5073            },
5074        ));
5075
5076        let thread = cx
5077            .update(|cx| {
5078                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5079            })
5080            .await
5081            .unwrap();
5082
5083        let response = thread
5084            .update(cx, |thread, cx| thread.send_raw("test message", cx))
5085            .await;
5086
5087        let response = response
5088            .expect("send should succeed")
5089            .expect("should have response");
5090        assert_eq!(
5091            response.stop_reason,
5092            acp::StopReason::Cancelled,
5093            "response should have Cancelled stop_reason"
5094        );
5095
5096        thread.read_with(cx, |thread, _| {
5097            let tool_entry = thread
5098                .entries
5099                .iter()
5100                .find_map(|e| {
5101                    if let AgentThreadEntry::ToolCall(call) = e {
5102                        Some(call)
5103                    } else {
5104                        None
5105                    }
5106                })
5107                .expect("should have tool call entry");
5108
5109            assert!(
5110                matches!(tool_entry.status, ToolCallStatus::Canceled),
5111                "tool should be marked as Canceled when response is Cancelled, got {:?}",
5112                tool_entry.status
5113            );
5114        });
5115    }
5116
5117    #[gpui::test]
5118    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5119        init_test(cx);
5120
5121        let fs = FakeFs::new(cx.executor());
5122        let project = Project::test(fs, [], cx).await;
5123        let connection = Rc::new(FakeAgentConnection::new());
5124        let set_title_calls = connection.set_title_calls.clone();
5125
5126        let thread = cx
5127            .update(|cx| {
5128                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5129            })
5130            .await
5131            .unwrap();
5132
5133        // Initial title is the default.
5134        thread.read_with(cx, |thread, _| {
5135            assert_eq!(thread.title(), None);
5136        });
5137
5138        // Setting a provisional title updates the display title.
5139        thread.update(cx, |thread, cx| {
5140            thread.set_provisional_title("Hello, can you help…".into(), cx);
5141        });
5142        thread.read_with(cx, |thread, _| {
5143            assert_eq!(
5144                thread.title().as_ref().map(|s| s.as_str()),
5145                Some("Hello, can you help…")
5146            );
5147        });
5148
5149        // The provisional title should NOT have propagated to the connection.
5150        assert_eq!(
5151            set_title_calls.borrow().len(),
5152            0,
5153            "provisional title should not propagate to the connection"
5154        );
5155
5156        // When the real title arrives via set_title, it replaces the
5157        // provisional title and propagates to the connection.
5158        let task = thread.update(cx, |thread, cx| {
5159            thread.set_title("Helping with Rust question".into(), cx)
5160        });
5161        task.await.expect("set_title should succeed");
5162        thread.read_with(cx, |thread, _| {
5163            assert_eq!(
5164                thread.title().as_ref().map(|s| s.as_str()),
5165                Some("Helping with Rust question")
5166            );
5167        });
5168        assert_eq!(
5169            set_title_calls.borrow().as_slice(),
5170            &[SharedString::from("Helping with Rust question")],
5171            "real title should propagate to the connection"
5172        );
5173    }
5174
5175    #[gpui::test]
5176    async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5177        cx: &mut TestAppContext,
5178    ) {
5179        init_test(cx);
5180
5181        let fs = FakeFs::new(cx.executor());
5182        let project = Project::test(fs, [], cx).await;
5183        let connection = Rc::new(FakeAgentConnection::new());
5184
5185        let thread = cx
5186            .update(|cx| {
5187                connection.clone().new_session(
5188                    project,
5189                    PathList::new(&[Path::new(path!("/test"))]),
5190                    cx,
5191                )
5192            })
5193            .await
5194            .unwrap();
5195
5196        let title_updated_events = Rc::new(RefCell::new(0usize));
5197        let title_updated_events_for_subscription = title_updated_events.clone();
5198        thread.update(cx, |_thread, cx| {
5199            cx.subscribe(
5200                &thread,
5201                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5202                    if matches!(event, AcpThreadEvent::TitleUpdated) {
5203                        *title_updated_events_for_subscription.borrow_mut() += 1;
5204                    }
5205                },
5206            )
5207            .detach();
5208        });
5209
5210        thread.update(cx, |thread, cx| {
5211            thread.set_provisional_title("Hello, can you help…".into(), cx);
5212        });
5213        assert_eq!(
5214            *title_updated_events.borrow(),
5215            1,
5216            "setting a provisional title should emit TitleUpdated"
5217        );
5218
5219        let result = thread.update(cx, |thread, cx| {
5220            thread.handle_session_update(
5221                acp::SessionUpdate::SessionInfoUpdate(
5222                    acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5223                ),
5224                cx,
5225            )
5226        });
5227        result.expect("session info update should succeed");
5228
5229        thread.read_with(cx, |thread, _| {
5230            assert_eq!(
5231                thread.title().as_ref().map(|s| s.as_str()),
5232                Some("Helping with Rust question")
5233            );
5234            assert!(
5235                !thread.has_provisional_title(),
5236                "session info title update should clear provisional title"
5237            );
5238        });
5239
5240        assert_eq!(
5241            *title_updated_events.borrow(),
5242            2,
5243            "session info title update should emit TitleUpdated"
5244        );
5245        assert!(
5246            connection.set_title_calls.borrow().is_empty(),
5247            "session info title update should not propagate back to the connection"
5248        );
5249    }
5250}