acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  13use itertools::Itertools;
  14use language::language_settings::FormatOnSave;
  15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  16use markdown::Markdown;
  17pub use mention::*;
  18use project::lsp_store::{FormatTrigger, LspFormatTarget};
  19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  20use serde::{Deserialize, Serialize};
  21use serde_json::to_string_pretty;
  22use std::collections::HashMap;
  23use std::error::Error;
  24use std::fmt::{Formatter, Write};
  25use std::ops::Range;
  26use std::process::ExitStatus;
  27use std::rc::Rc;
  28use std::time::{Duration, Instant};
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use task::{Shell, ShellBuilder};
  31pub use terminal::*;
  32use text::Bias;
  33use ui::App;
  34use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  35use uuid::Uuid;
  36
  37/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  38/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  39pub const TOOL_NAME_META_KEY: &str = "tool_name";
  40
  41/// Helper to extract tool name from ACP meta
  42pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  43    meta.as_ref()
  44        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  45        .and_then(|v| v.as_str())
  46        .map(|s| SharedString::from(s.to_owned()))
  47}
  48
  49/// Helper to create meta with tool name
  50pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  51    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  52}
  53
  54/// Key used in ACP ToolCall meta to store the session id and message indexes
  55pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  56
  57#[derive(Clone, Debug, Deserialize, Serialize)]
  58pub struct SubagentSessionInfo {
  59    /// The session id of the subagent sessiont that was spawned
  60    pub session_id: acp::SessionId,
  61    /// The index of the message of the start of the "turn" run by this tool call
  62    pub message_start_index: usize,
  63    /// The index of the output of the message that the subagent has returned
  64    #[serde(skip_serializing_if = "Option::is_none")]
  65    pub message_end_index: Option<usize>,
  66}
  67
  68/// Helper to extract subagent session id from ACP meta
  69pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  70    meta.as_ref()
  71        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  72        .and_then(|v| serde_json::from_value(v.clone()).ok())
  73}
  74
  75#[derive(Debug)]
  76pub struct UserMessage {
  77    pub id: Option<UserMessageId>,
  78    pub content: ContentBlock,
  79    pub chunks: Vec<acp::ContentBlock>,
  80    pub checkpoint: Option<Checkpoint>,
  81    pub indented: bool,
  82}
  83
  84#[derive(Debug)]
  85pub struct Checkpoint {
  86    git_checkpoint: GitStoreCheckpoint,
  87    pub show: bool,
  88}
  89
  90impl UserMessage {
  91    fn to_markdown(&self, cx: &App) -> String {
  92        let mut markdown = String::new();
  93        if self
  94            .checkpoint
  95            .as_ref()
  96            .is_some_and(|checkpoint| checkpoint.show)
  97        {
  98            writeln!(markdown, "## User (checkpoint)").unwrap();
  99        } else {
 100            writeln!(markdown, "## User").unwrap();
 101        }
 102        writeln!(markdown).unwrap();
 103        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 104        writeln!(markdown).unwrap();
 105        markdown
 106    }
 107}
 108
 109#[derive(Debug, PartialEq)]
 110pub struct AssistantMessage {
 111    pub chunks: Vec<AssistantMessageChunk>,
 112    pub indented: bool,
 113    pub is_subagent_output: bool,
 114}
 115
 116impl AssistantMessage {
 117    pub fn to_markdown(&self, cx: &App) -> String {
 118        format!(
 119            "## Assistant\n\n{}\n\n",
 120            self.chunks
 121                .iter()
 122                .map(|chunk| chunk.to_markdown(cx))
 123                .join("\n\n")
 124        )
 125    }
 126}
 127
 128#[derive(Debug, PartialEq)]
 129pub enum AssistantMessageChunk {
 130    Message { block: ContentBlock },
 131    Thought { block: ContentBlock },
 132}
 133
 134impl AssistantMessageChunk {
 135    pub fn from_str(
 136        chunk: &str,
 137        language_registry: &Arc<LanguageRegistry>,
 138        path_style: PathStyle,
 139        cx: &mut App,
 140    ) -> Self {
 141        Self::Message {
 142            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 143        }
 144    }
 145
 146    fn to_markdown(&self, cx: &App) -> String {
 147        match self {
 148            Self::Message { block } => block.to_markdown(cx).to_string(),
 149            Self::Thought { block } => {
 150                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 151            }
 152        }
 153    }
 154}
 155
 156#[derive(Debug)]
 157pub enum AgentThreadEntry {
 158    UserMessage(UserMessage),
 159    AssistantMessage(AssistantMessage),
 160    ToolCall(ToolCall),
 161}
 162
 163impl AgentThreadEntry {
 164    pub fn is_indented(&self) -> bool {
 165        match self {
 166            Self::UserMessage(message) => message.indented,
 167            Self::AssistantMessage(message) => message.indented,
 168            Self::ToolCall(_) => false,
 169        }
 170    }
 171
 172    pub fn to_markdown(&self, cx: &App) -> String {
 173        match self {
 174            Self::UserMessage(message) => message.to_markdown(cx),
 175            Self::AssistantMessage(message) => message.to_markdown(cx),
 176            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 177        }
 178    }
 179
 180    pub fn user_message(&self) -> Option<&UserMessage> {
 181        if let AgentThreadEntry::UserMessage(message) = self {
 182            Some(message)
 183        } else {
 184            None
 185        }
 186    }
 187
 188    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 189        if let AgentThreadEntry::ToolCall(call) = self {
 190            itertools::Either::Left(call.diffs())
 191        } else {
 192            itertools::Either::Right(std::iter::empty())
 193        }
 194    }
 195
 196    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 197        if let AgentThreadEntry::ToolCall(call) = self {
 198            itertools::Either::Left(call.terminals())
 199        } else {
 200            itertools::Either::Right(std::iter::empty())
 201        }
 202    }
 203
 204    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 205        if let AgentThreadEntry::ToolCall(ToolCall {
 206            locations,
 207            resolved_locations,
 208            ..
 209        }) = self
 210        {
 211            Some((
 212                locations.get(ix)?.clone(),
 213                resolved_locations.get(ix)?.clone()?,
 214            ))
 215        } else {
 216            None
 217        }
 218    }
 219}
 220
 221#[derive(Debug)]
 222pub struct ToolCall {
 223    pub id: acp::ToolCallId,
 224    pub label: Entity<Markdown>,
 225    pub kind: acp::ToolKind,
 226    pub content: Vec<ToolCallContent>,
 227    pub status: ToolCallStatus,
 228    pub locations: Vec<acp::ToolCallLocation>,
 229    pub resolved_locations: Vec<Option<AgentLocation>>,
 230    pub raw_input: Option<serde_json::Value>,
 231    pub raw_input_markdown: Option<Entity<Markdown>>,
 232    pub raw_output: Option<serde_json::Value>,
 233    pub tool_name: Option<SharedString>,
 234    pub subagent_session_info: Option<SubagentSessionInfo>,
 235}
 236
 237impl ToolCall {
 238    fn from_acp(
 239        tool_call: acp::ToolCall,
 240        status: ToolCallStatus,
 241        language_registry: Arc<LanguageRegistry>,
 242        path_style: PathStyle,
 243        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 244        cx: &mut App,
 245    ) -> Result<Self> {
 246        let title = if tool_call.kind == acp::ToolKind::Execute {
 247            tool_call.title
 248        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 249            first_line.to_owned() + ""
 250        } else {
 251            tool_call.title
 252        };
 253        let mut content = Vec::with_capacity(tool_call.content.len());
 254        for item in tool_call.content {
 255            if let Some(item) = ToolCallContent::from_acp(
 256                item,
 257                language_registry.clone(),
 258                path_style,
 259                terminals,
 260                cx,
 261            )? {
 262                content.push(item);
 263            }
 264        }
 265
 266        let raw_input_markdown = tool_call
 267            .raw_input
 268            .as_ref()
 269            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 270
 271        let tool_name = tool_name_from_meta(&tool_call.meta);
 272
 273        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 274
 275        let result = Self {
 276            id: tool_call.tool_call_id,
 277            label: cx
 278                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 279            kind: tool_call.kind,
 280            content,
 281            locations: tool_call.locations,
 282            resolved_locations: Vec::default(),
 283            status,
 284            raw_input: tool_call.raw_input,
 285            raw_input_markdown,
 286            raw_output: tool_call.raw_output,
 287            tool_name,
 288            subagent_session_info,
 289        };
 290        Ok(result)
 291    }
 292
 293    fn update_fields(
 294        &mut self,
 295        fields: acp::ToolCallUpdateFields,
 296        meta: Option<acp::Meta>,
 297        language_registry: Arc<LanguageRegistry>,
 298        path_style: PathStyle,
 299        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 300        cx: &mut App,
 301    ) -> Result<()> {
 302        let acp::ToolCallUpdateFields {
 303            kind,
 304            status,
 305            title,
 306            content,
 307            locations,
 308            raw_input,
 309            raw_output,
 310            ..
 311        } = fields;
 312
 313        if let Some(kind) = kind {
 314            self.kind = kind;
 315        }
 316
 317        if let Some(status) = status {
 318            self.status = status.into();
 319        }
 320
 321        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 322            self.subagent_session_info = Some(subagent_session_info);
 323        }
 324
 325        if let Some(title) = title {
 326            if self.kind == acp::ToolKind::Execute {
 327                for terminal in self.terminals() {
 328                    terminal.update(cx, |terminal, cx| {
 329                        terminal.update_command_label(&title, cx);
 330                    });
 331                }
 332            }
 333            self.label.update(cx, |label, cx| {
 334                if self.kind == acp::ToolKind::Execute {
 335                    label.replace(title, cx);
 336                } else if let Some((first_line, _)) = title.split_once("\n") {
 337                    label.replace(first_line.to_owned() + "", cx);
 338                } else {
 339                    label.replace(title, cx);
 340                }
 341            });
 342        }
 343
 344        if let Some(content) = content {
 345            let mut new_content_len = content.len();
 346            let mut content = content.into_iter();
 347
 348            // Reuse existing content if we can
 349            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 350                let valid_content =
 351                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 352                if !valid_content {
 353                    new_content_len -= 1;
 354                }
 355            }
 356            for new in content {
 357                if let Some(new) = ToolCallContent::from_acp(
 358                    new,
 359                    language_registry.clone(),
 360                    path_style,
 361                    terminals,
 362                    cx,
 363                )? {
 364                    self.content.push(new);
 365                } else {
 366                    new_content_len -= 1;
 367                }
 368            }
 369            self.content.truncate(new_content_len);
 370        }
 371
 372        if let Some(locations) = locations {
 373            self.locations = locations;
 374        }
 375
 376        if let Some(raw_input) = raw_input {
 377            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 378            self.raw_input = Some(raw_input);
 379        }
 380
 381        if let Some(raw_output) = raw_output {
 382            if self.content.is_empty()
 383                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 384            {
 385                self.content
 386                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 387                        markdown,
 388                    }));
 389            }
 390            self.raw_output = Some(raw_output);
 391        }
 392        Ok(())
 393    }
 394
 395    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 396        self.content.iter().filter_map(|content| match content {
 397            ToolCallContent::Diff(diff) => Some(diff),
 398            ToolCallContent::ContentBlock(_) => None,
 399            ToolCallContent::Terminal(_) => None,
 400        })
 401    }
 402
 403    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 404        self.content.iter().filter_map(|content| match content {
 405            ToolCallContent::Terminal(terminal) => Some(terminal),
 406            ToolCallContent::ContentBlock(_) => None,
 407            ToolCallContent::Diff(_) => None,
 408        })
 409    }
 410
 411    pub fn is_subagent(&self) -> bool {
 412        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 413            || self.subagent_session_info.is_some()
 414    }
 415
 416    pub fn to_markdown(&self, cx: &App) -> String {
 417        let mut markdown = format!(
 418            "**Tool Call: {}**\nStatus: {}\n\n",
 419            self.label.read(cx).source(),
 420            self.status
 421        );
 422        for content in &self.content {
 423            markdown.push_str(content.to_markdown(cx).as_str());
 424            markdown.push_str("\n\n");
 425        }
 426        markdown
 427    }
 428
 429    async fn resolve_location(
 430        location: acp::ToolCallLocation,
 431        project: WeakEntity<Project>,
 432        cx: &mut AsyncApp,
 433    ) -> Option<ResolvedLocation> {
 434        let buffer = project
 435            .update(cx, |project, cx| {
 436                project
 437                    .project_path_for_absolute_path(&location.path, cx)
 438                    .map(|path| project.open_buffer(path, cx))
 439            })
 440            .ok()??;
 441        let buffer = buffer.await.log_err()?;
 442        let position = buffer.update(cx, |buffer, _| {
 443            let snapshot = buffer.snapshot();
 444            if let Some(row) = location.line {
 445                let column = snapshot.indent_size_for_line(row).len;
 446                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 447                snapshot.anchor_before(point)
 448            } else {
 449                Anchor::min_for_buffer(snapshot.remote_id())
 450            }
 451        });
 452
 453        Some(ResolvedLocation { buffer, position })
 454    }
 455
 456    fn resolve_locations(
 457        &self,
 458        project: Entity<Project>,
 459        cx: &mut App,
 460    ) -> Task<Vec<Option<ResolvedLocation>>> {
 461        let locations = self.locations.clone();
 462        project.update(cx, |_, cx| {
 463            cx.spawn(async move |project, cx| {
 464                let mut new_locations = Vec::new();
 465                for location in locations {
 466                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 467                }
 468                new_locations
 469            })
 470        })
 471    }
 472}
 473
 474// Separate so we can hold a strong reference to the buffer
 475// for saving on the thread
 476#[derive(Clone, Debug, PartialEq, Eq)]
 477struct ResolvedLocation {
 478    buffer: Entity<Buffer>,
 479    position: Anchor,
 480}
 481
 482impl From<&ResolvedLocation> for AgentLocation {
 483    fn from(value: &ResolvedLocation) -> Self {
 484        Self {
 485            buffer: value.buffer.downgrade(),
 486            position: value.position,
 487        }
 488    }
 489}
 490
 491#[derive(Debug)]
 492pub enum ToolCallStatus {
 493    /// The tool call hasn't started running yet, but we start showing it to
 494    /// the user.
 495    Pending,
 496    /// The tool call is waiting for confirmation from the user.
 497    WaitingForConfirmation {
 498        options: PermissionOptions,
 499        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 500    },
 501    /// The tool call is currently running.
 502    InProgress,
 503    /// The tool call completed successfully.
 504    Completed,
 505    /// The tool call failed.
 506    Failed,
 507    /// The user rejected the tool call.
 508    Rejected,
 509    /// The user canceled generation so the tool call was canceled.
 510    Canceled,
 511}
 512
 513impl From<acp::ToolCallStatus> for ToolCallStatus {
 514    fn from(status: acp::ToolCallStatus) -> Self {
 515        match status {
 516            acp::ToolCallStatus::Pending => Self::Pending,
 517            acp::ToolCallStatus::InProgress => Self::InProgress,
 518            acp::ToolCallStatus::Completed => Self::Completed,
 519            acp::ToolCallStatus::Failed => Self::Failed,
 520            _ => Self::Pending,
 521        }
 522    }
 523}
 524
 525impl Display for ToolCallStatus {
 526    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 527        write!(
 528            f,
 529            "{}",
 530            match self {
 531                ToolCallStatus::Pending => "Pending",
 532                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 533                ToolCallStatus::InProgress => "In Progress",
 534                ToolCallStatus::Completed => "Completed",
 535                ToolCallStatus::Failed => "Failed",
 536                ToolCallStatus::Rejected => "Rejected",
 537                ToolCallStatus::Canceled => "Canceled",
 538            }
 539        )
 540    }
 541}
 542
 543#[derive(Debug, PartialEq, Clone)]
 544pub enum ContentBlock {
 545    Empty,
 546    Markdown { markdown: Entity<Markdown> },
 547    ResourceLink { resource_link: acp::ResourceLink },
 548    Image { image: Arc<gpui::Image> },
 549}
 550
 551impl ContentBlock {
 552    pub fn new(
 553        block: acp::ContentBlock,
 554        language_registry: &Arc<LanguageRegistry>,
 555        path_style: PathStyle,
 556        cx: &mut App,
 557    ) -> Self {
 558        let mut this = Self::Empty;
 559        this.append(block, language_registry, path_style, cx);
 560        this
 561    }
 562
 563    pub fn new_combined(
 564        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 565        language_registry: Arc<LanguageRegistry>,
 566        path_style: PathStyle,
 567        cx: &mut App,
 568    ) -> Self {
 569        let mut this = Self::Empty;
 570        for block in blocks {
 571            this.append(block, &language_registry, path_style, cx);
 572        }
 573        this
 574    }
 575
 576    pub fn append(
 577        &mut self,
 578        block: acp::ContentBlock,
 579        language_registry: &Arc<LanguageRegistry>,
 580        path_style: PathStyle,
 581        cx: &mut App,
 582    ) {
 583        match (&mut *self, &block) {
 584            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 585                *self = ContentBlock::ResourceLink {
 586                    resource_link: resource_link.clone(),
 587                };
 588            }
 589            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 590                if let Some(image) = Self::decode_image(image_content) {
 591                    *self = ContentBlock::Image { image };
 592                } else {
 593                    let new_content = Self::image_md(image_content);
 594                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 595                }
 596            }
 597            (ContentBlock::Empty, _) => {
 598                let new_content = Self::block_string_contents(&block, path_style);
 599                *self = Self::create_markdown_block(new_content, language_registry, cx);
 600            }
 601            (ContentBlock::Markdown { markdown }, _) => {
 602                let new_content = Self::block_string_contents(&block, path_style);
 603                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 604            }
 605            (ContentBlock::ResourceLink { resource_link }, _) => {
 606                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 607                let new_content = Self::block_string_contents(&block, path_style);
 608                let combined = format!("{}\n{}", existing_content, new_content);
 609                *self = Self::create_markdown_block(combined, language_registry, cx);
 610            }
 611            (ContentBlock::Image { .. }, _) => {
 612                let new_content = Self::block_string_contents(&block, path_style);
 613                let combined = format!("`Image`\n{}", new_content);
 614                *self = Self::create_markdown_block(combined, language_registry, cx);
 615            }
 616        }
 617    }
 618
 619    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 620        use base64::Engine as _;
 621
 622        let bytes = base64::engine::general_purpose::STANDARD
 623            .decode(image_content.data.as_bytes())
 624            .ok()?;
 625        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 626        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 627    }
 628
 629    fn create_markdown_block(
 630        content: String,
 631        language_registry: &Arc<LanguageRegistry>,
 632        cx: &mut App,
 633    ) -> ContentBlock {
 634        ContentBlock::Markdown {
 635            markdown: cx
 636                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 637        }
 638    }
 639
 640    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 641        match block {
 642            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 643            acp::ContentBlock::ResourceLink(resource_link) => {
 644                Self::resource_link_md(&resource_link.uri, path_style)
 645            }
 646            acp::ContentBlock::Resource(acp::EmbeddedResource {
 647                resource:
 648                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 649                        uri,
 650                        ..
 651                    }),
 652                ..
 653            }) => Self::resource_link_md(uri, path_style),
 654            acp::ContentBlock::Image(image) => Self::image_md(image),
 655            _ => String::new(),
 656        }
 657    }
 658
 659    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 660        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 661            uri.as_link().to_string()
 662        } else {
 663            uri.to_string()
 664        }
 665    }
 666
 667    fn image_md(_image: &acp::ImageContent) -> String {
 668        "`Image`".into()
 669    }
 670
 671    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 672        match self {
 673            ContentBlock::Empty => "",
 674            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 675            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 676            ContentBlock::Image { .. } => "`Image`",
 677        }
 678    }
 679
 680    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 681        match self {
 682            ContentBlock::Empty => None,
 683            ContentBlock::Markdown { markdown } => Some(markdown),
 684            ContentBlock::ResourceLink { .. } => None,
 685            ContentBlock::Image { .. } => None,
 686        }
 687    }
 688
 689    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 690        match self {
 691            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 692            _ => None,
 693        }
 694    }
 695
 696    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 697        match self {
 698            ContentBlock::Image { image } => Some(image),
 699            _ => None,
 700        }
 701    }
 702}
 703
 704#[derive(Debug)]
 705pub enum ToolCallContent {
 706    ContentBlock(ContentBlock),
 707    Diff(Entity<Diff>),
 708    Terminal(Entity<Terminal>),
 709}
 710
 711impl ToolCallContent {
 712    pub fn from_acp(
 713        content: acp::ToolCallContent,
 714        language_registry: Arc<LanguageRegistry>,
 715        path_style: PathStyle,
 716        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 717        cx: &mut App,
 718    ) -> Result<Option<Self>> {
 719        match content {
 720            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 721                Ok(Some(Self::ContentBlock(ContentBlock::new(
 722                    content,
 723                    &language_registry,
 724                    path_style,
 725                    cx,
 726                ))))
 727            }
 728            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 729                Diff::finalized(
 730                    diff.path.to_string_lossy().into_owned(),
 731                    diff.old_text,
 732                    diff.new_text,
 733                    language_registry,
 734                    cx,
 735                )
 736            })))),
 737            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 738                .get(&terminal_id)
 739                .cloned()
 740                .map(|terminal| Some(Self::Terminal(terminal)))
 741                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 742            _ => Ok(None),
 743        }
 744    }
 745
 746    pub fn update_from_acp(
 747        &mut self,
 748        new: acp::ToolCallContent,
 749        language_registry: Arc<LanguageRegistry>,
 750        path_style: PathStyle,
 751        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 752        cx: &mut App,
 753    ) -> Result<bool> {
 754        let needs_update = match (&self, &new) {
 755            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 756                old_diff.read(cx).needs_update(
 757                    new_diff.old_text.as_deref().unwrap_or(""),
 758                    &new_diff.new_text,
 759                    cx,
 760                )
 761            }
 762            _ => true,
 763        };
 764
 765        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 766            if needs_update {
 767                *self = update;
 768            }
 769            Ok(true)
 770        } else {
 771            Ok(false)
 772        }
 773    }
 774
 775    pub fn to_markdown(&self, cx: &App) -> String {
 776        match self {
 777            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 778            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 779            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 780        }
 781    }
 782
 783    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 784        match self {
 785            Self::ContentBlock(content) => content.image(),
 786            _ => None,
 787        }
 788    }
 789}
 790
 791#[derive(Debug, PartialEq)]
 792pub enum ToolCallUpdate {
 793    UpdateFields(acp::ToolCallUpdate),
 794    UpdateDiff(ToolCallUpdateDiff),
 795    UpdateTerminal(ToolCallUpdateTerminal),
 796}
 797
 798impl ToolCallUpdate {
 799    fn id(&self) -> &acp::ToolCallId {
 800        match self {
 801            Self::UpdateFields(update) => &update.tool_call_id,
 802            Self::UpdateDiff(diff) => &diff.id,
 803            Self::UpdateTerminal(terminal) => &terminal.id,
 804        }
 805    }
 806}
 807
 808impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 809    fn from(update: acp::ToolCallUpdate) -> Self {
 810        Self::UpdateFields(update)
 811    }
 812}
 813
 814impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 815    fn from(diff: ToolCallUpdateDiff) -> Self {
 816        Self::UpdateDiff(diff)
 817    }
 818}
 819
 820#[derive(Debug, PartialEq)]
 821pub struct ToolCallUpdateDiff {
 822    pub id: acp::ToolCallId,
 823    pub diff: Entity<Diff>,
 824}
 825
 826impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 827    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 828        Self::UpdateTerminal(terminal)
 829    }
 830}
 831
 832#[derive(Debug, PartialEq)]
 833pub struct ToolCallUpdateTerminal {
 834    pub id: acp::ToolCallId,
 835    pub terminal: Entity<Terminal>,
 836}
 837
 838#[derive(Debug, Default)]
 839pub struct Plan {
 840    pub entries: Vec<PlanEntry>,
 841}
 842
 843#[derive(Debug)]
 844pub struct PlanStats<'a> {
 845    pub in_progress_entry: Option<&'a PlanEntry>,
 846    pub pending: u32,
 847    pub completed: u32,
 848}
 849
 850impl Plan {
 851    pub fn is_empty(&self) -> bool {
 852        self.entries.is_empty()
 853    }
 854
 855    pub fn stats(&self) -> PlanStats<'_> {
 856        let mut stats = PlanStats {
 857            in_progress_entry: None,
 858            pending: 0,
 859            completed: 0,
 860        };
 861
 862        for entry in &self.entries {
 863            match &entry.status {
 864                acp::PlanEntryStatus::Pending => {
 865                    stats.pending += 1;
 866                }
 867                acp::PlanEntryStatus::InProgress => {
 868                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 869                }
 870                acp::PlanEntryStatus::Completed => {
 871                    stats.completed += 1;
 872                }
 873                _ => {}
 874            }
 875        }
 876
 877        stats
 878    }
 879}
 880
 881#[derive(Debug)]
 882pub struct PlanEntry {
 883    pub content: Entity<Markdown>,
 884    pub priority: acp::PlanEntryPriority,
 885    pub status: acp::PlanEntryStatus,
 886}
 887
 888impl PlanEntry {
 889    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 890        Self {
 891            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 892            priority: entry.priority,
 893            status: entry.status,
 894        }
 895    }
 896}
 897
 898#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 899pub struct TokenUsage {
 900    pub max_tokens: u64,
 901    pub used_tokens: u64,
 902    pub input_tokens: u64,
 903    pub output_tokens: u64,
 904    pub max_output_tokens: Option<u64>,
 905}
 906
 907pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 908
 909impl TokenUsage {
 910    pub fn ratio(&self) -> TokenUsageRatio {
 911        #[cfg(debug_assertions)]
 912        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 913            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 914            .parse()
 915            .unwrap();
 916        #[cfg(not(debug_assertions))]
 917        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
 918
 919        // When the maximum is unknown because there is no selected model,
 920        // avoid showing the token limit warning.
 921        if self.max_tokens == 0 {
 922            TokenUsageRatio::Normal
 923        } else if self.used_tokens >= self.max_tokens {
 924            TokenUsageRatio::Exceeded
 925        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 926            TokenUsageRatio::Warning
 927        } else {
 928            TokenUsageRatio::Normal
 929        }
 930    }
 931}
 932
 933#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
 934pub enum TokenUsageRatio {
 935    Normal,
 936    Warning,
 937    Exceeded,
 938}
 939
 940#[derive(Debug, Clone)]
 941pub struct RetryStatus {
 942    pub last_error: SharedString,
 943    pub attempt: usize,
 944    pub max_attempts: usize,
 945    pub started_at: Instant,
 946    pub duration: Duration,
 947}
 948
 949struct RunningTurn {
 950    id: u32,
 951    send_task: Task<()>,
 952}
 953
 954pub struct AcpThread {
 955    session_id: acp::SessionId,
 956    cwd: Option<PathBuf>,
 957    parent_session_id: Option<acp::SessionId>,
 958    title: SharedString,
 959    provisional_title: Option<SharedString>,
 960    entries: Vec<AgentThreadEntry>,
 961    plan: Plan,
 962    project: Entity<Project>,
 963    action_log: Entity<ActionLog>,
 964    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 965    turn_id: u32,
 966    running_turn: Option<RunningTurn>,
 967    connection: Rc<dyn AgentConnection>,
 968    token_usage: Option<TokenUsage>,
 969    prompt_capabilities: acp::PromptCapabilities,
 970    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 971    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 972    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 973    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 974    had_error: bool,
 975    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
 976    draft_prompt: Option<Vec<acp::ContentBlock>>,
 977    /// The initial scroll position for the thread view, set during session registration.
 978    ui_scroll_position: Option<gpui::ListOffset>,
 979}
 980
 981impl From<&AcpThread> for ActionLogTelemetry {
 982    fn from(value: &AcpThread) -> Self {
 983        Self {
 984            agent_telemetry_id: value.connection().telemetry_id(),
 985            session_id: value.session_id.0.clone(),
 986        }
 987    }
 988}
 989
 990#[derive(Debug)]
 991pub enum AcpThreadEvent {
 992    NewEntry,
 993    TitleUpdated,
 994    TokenUsageUpdated,
 995    EntryUpdated(usize),
 996    EntriesRemoved(Range<usize>),
 997    ToolAuthorizationRequested(acp::ToolCallId),
 998    ToolAuthorizationReceived(acp::ToolCallId),
 999    Retry(RetryStatus),
1000    SubagentSpawned(acp::SessionId),
1001    Stopped(acp::StopReason),
1002    Error,
1003    LoadError(LoadError),
1004    PromptCapabilitiesUpdated,
1005    Refusal,
1006    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1007    ModeUpdated(acp::SessionModeId),
1008    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1009}
1010
1011impl EventEmitter<AcpThreadEvent> for AcpThread {}
1012
1013#[derive(Debug, Clone)]
1014pub enum TerminalProviderEvent {
1015    Created {
1016        terminal_id: acp::TerminalId,
1017        label: String,
1018        cwd: Option<PathBuf>,
1019        output_byte_limit: Option<u64>,
1020        terminal: Entity<::terminal::Terminal>,
1021    },
1022    Output {
1023        terminal_id: acp::TerminalId,
1024        data: Vec<u8>,
1025    },
1026    TitleChanged {
1027        terminal_id: acp::TerminalId,
1028        title: String,
1029    },
1030    Exit {
1031        terminal_id: acp::TerminalId,
1032        status: acp::TerminalExitStatus,
1033    },
1034}
1035
1036#[derive(Debug, Clone)]
1037pub enum TerminalProviderCommand {
1038    WriteInput {
1039        terminal_id: acp::TerminalId,
1040        bytes: Vec<u8>,
1041    },
1042    Resize {
1043        terminal_id: acp::TerminalId,
1044        cols: u16,
1045        rows: u16,
1046    },
1047    Close {
1048        terminal_id: acp::TerminalId,
1049    },
1050}
1051
1052#[derive(PartialEq, Eq, Debug)]
1053pub enum ThreadStatus {
1054    Idle,
1055    Generating,
1056}
1057
1058#[derive(Debug, Clone)]
1059pub enum LoadError {
1060    Unsupported {
1061        command: SharedString,
1062        current_version: SharedString,
1063        minimum_version: SharedString,
1064    },
1065    FailedToInstall(SharedString),
1066    Exited {
1067        status: ExitStatus,
1068    },
1069    Other(SharedString),
1070}
1071
1072impl Display for LoadError {
1073    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1074        match self {
1075            LoadError::Unsupported {
1076                command: path,
1077                current_version,
1078                minimum_version,
1079            } => {
1080                write!(
1081                    f,
1082                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1083                )
1084            }
1085            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1086            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1087            LoadError::Other(msg) => write!(f, "{msg}"),
1088        }
1089    }
1090}
1091
1092impl Error for LoadError {}
1093
1094impl AcpThread {
1095    pub fn new(
1096        parent_session_id: Option<acp::SessionId>,
1097        title: impl Into<SharedString>,
1098        cwd: Option<PathBuf>,
1099        connection: Rc<dyn AgentConnection>,
1100        project: Entity<Project>,
1101        action_log: Entity<ActionLog>,
1102        session_id: acp::SessionId,
1103        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1104        cx: &mut Context<Self>,
1105    ) -> Self {
1106        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1107        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1108            loop {
1109                let caps = prompt_capabilities_rx.recv().await?;
1110                this.update(cx, |this, cx| {
1111                    this.prompt_capabilities = caps;
1112                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1113                })?;
1114            }
1115        });
1116
1117        Self {
1118            parent_session_id,
1119            cwd,
1120            action_log,
1121            shared_buffers: Default::default(),
1122            entries: Default::default(),
1123            plan: Default::default(),
1124            title: title.into(),
1125            provisional_title: None,
1126            project,
1127            running_turn: None,
1128            turn_id: 0,
1129            connection,
1130            session_id,
1131            token_usage: None,
1132            prompt_capabilities,
1133            _observe_prompt_capabilities: task,
1134            terminals: HashMap::default(),
1135            pending_terminal_output: HashMap::default(),
1136            pending_terminal_exit: HashMap::default(),
1137            had_error: false,
1138            draft_prompt: None,
1139            ui_scroll_position: None,
1140        }
1141    }
1142
1143    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1144        self.parent_session_id.as_ref()
1145    }
1146
1147    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1148        self.prompt_capabilities.clone()
1149    }
1150
1151    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1152        self.draft_prompt.as_deref()
1153    }
1154
1155    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1156        self.draft_prompt = prompt;
1157    }
1158
1159    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1160        self.ui_scroll_position
1161    }
1162
1163    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1164        self.ui_scroll_position = position;
1165    }
1166
1167    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1168        &self.connection
1169    }
1170
1171    pub fn action_log(&self) -> &Entity<ActionLog> {
1172        &self.action_log
1173    }
1174
1175    pub fn project(&self) -> &Entity<Project> {
1176        &self.project
1177    }
1178
1179    pub fn title(&self) -> SharedString {
1180        self.provisional_title
1181            .clone()
1182            .unwrap_or_else(|| self.title.clone())
1183    }
1184
1185    pub fn entries(&self) -> &[AgentThreadEntry] {
1186        &self.entries
1187    }
1188
1189    pub fn session_id(&self) -> &acp::SessionId {
1190        &self.session_id
1191    }
1192
1193    pub fn cwd(&self) -> Option<&PathBuf> {
1194        self.cwd.as_ref()
1195    }
1196
1197    pub fn status(&self) -> ThreadStatus {
1198        if self.running_turn.is_some() {
1199            ThreadStatus::Generating
1200        } else {
1201            ThreadStatus::Idle
1202        }
1203    }
1204
1205    pub fn had_error(&self) -> bool {
1206        self.had_error
1207    }
1208
1209    pub fn is_waiting_for_confirmation(&self) -> bool {
1210        for entry in self.entries.iter().rev() {
1211            match entry {
1212                AgentThreadEntry::UserMessage(_) => return false,
1213                AgentThreadEntry::ToolCall(ToolCall {
1214                    status: ToolCallStatus::WaitingForConfirmation { .. },
1215                    ..
1216                }) => return true,
1217                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1218            }
1219        }
1220        false
1221    }
1222
1223    pub fn token_usage(&self) -> Option<&TokenUsage> {
1224        self.token_usage.as_ref()
1225    }
1226
1227    pub fn has_pending_edit_tool_calls(&self) -> bool {
1228        for entry in self.entries.iter().rev() {
1229            match entry {
1230                AgentThreadEntry::UserMessage(_) => return false,
1231                AgentThreadEntry::ToolCall(
1232                    call @ ToolCall {
1233                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1234                        ..
1235                    },
1236                ) if call.diffs().next().is_some() => {
1237                    return true;
1238                }
1239                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1240            }
1241        }
1242
1243        false
1244    }
1245
1246    pub fn has_in_progress_tool_calls(&self) -> bool {
1247        for entry in self.entries.iter().rev() {
1248            match entry {
1249                AgentThreadEntry::UserMessage(_) => return false,
1250                AgentThreadEntry::ToolCall(ToolCall {
1251                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1252                    ..
1253                }) => {
1254                    return true;
1255                }
1256                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1257            }
1258        }
1259
1260        false
1261    }
1262
1263    pub fn used_tools_since_last_user_message(&self) -> bool {
1264        for entry in self.entries.iter().rev() {
1265            match entry {
1266                AgentThreadEntry::UserMessage(..) => return false,
1267                AgentThreadEntry::AssistantMessage(..) => continue,
1268                AgentThreadEntry::ToolCall(..) => return true,
1269            }
1270        }
1271
1272        false
1273    }
1274
1275    pub fn handle_session_update(
1276        &mut self,
1277        update: acp::SessionUpdate,
1278        cx: &mut Context<Self>,
1279    ) -> Result<(), acp::Error> {
1280        match update {
1281            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1282                self.push_user_content_block(None, content, cx);
1283            }
1284            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1285                self.push_assistant_content_block(content, false, cx);
1286            }
1287            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1288                self.push_assistant_content_block(content, true, cx);
1289            }
1290            acp::SessionUpdate::ToolCall(tool_call) => {
1291                self.upsert_tool_call(tool_call, cx)?;
1292            }
1293            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1294                self.update_tool_call(tool_call_update, cx)?;
1295            }
1296            acp::SessionUpdate::Plan(plan) => {
1297                self.update_plan(plan, cx);
1298            }
1299            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1300                available_commands,
1301                ..
1302            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1303            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1304                current_mode_id,
1305                ..
1306            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1307            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1308                config_options,
1309                ..
1310            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1311            _ => {}
1312        }
1313        Ok(())
1314    }
1315
1316    pub fn push_user_content_block(
1317        &mut self,
1318        message_id: Option<UserMessageId>,
1319        chunk: acp::ContentBlock,
1320        cx: &mut Context<Self>,
1321    ) {
1322        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1323    }
1324
1325    pub fn push_user_content_block_with_indent(
1326        &mut self,
1327        message_id: Option<UserMessageId>,
1328        chunk: acp::ContentBlock,
1329        indented: bool,
1330        cx: &mut Context<Self>,
1331    ) {
1332        let language_registry = self.project.read(cx).languages().clone();
1333        let path_style = self.project.read(cx).path_style(cx);
1334        let entries_len = self.entries.len();
1335
1336        if let Some(last_entry) = self.entries.last_mut()
1337            && let AgentThreadEntry::UserMessage(UserMessage {
1338                id,
1339                content,
1340                chunks,
1341                indented: existing_indented,
1342                ..
1343            }) = last_entry
1344            && *existing_indented == indented
1345        {
1346            *id = message_id.or(id.take());
1347            content.append(chunk.clone(), &language_registry, path_style, cx);
1348            chunks.push(chunk);
1349            let idx = entries_len - 1;
1350            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1351        } else {
1352            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1353            self.push_entry(
1354                AgentThreadEntry::UserMessage(UserMessage {
1355                    id: message_id,
1356                    content,
1357                    chunks: vec![chunk],
1358                    checkpoint: None,
1359                    indented,
1360                }),
1361                cx,
1362            );
1363        }
1364    }
1365
1366    pub fn push_assistant_content_block(
1367        &mut self,
1368        chunk: acp::ContentBlock,
1369        is_thought: bool,
1370        cx: &mut Context<Self>,
1371    ) {
1372        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1373    }
1374
1375    pub fn push_assistant_content_block_with_indent(
1376        &mut self,
1377        chunk: acp::ContentBlock,
1378        is_thought: bool,
1379        indented: bool,
1380        cx: &mut Context<Self>,
1381    ) {
1382        let language_registry = self.project.read(cx).languages().clone();
1383        let path_style = self.project.read(cx).path_style(cx);
1384        let entries_len = self.entries.len();
1385        if let Some(last_entry) = self.entries.last_mut()
1386            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1387                chunks,
1388                indented: existing_indented,
1389                is_subagent_output: _,
1390            }) = last_entry
1391            && *existing_indented == indented
1392        {
1393            let idx = entries_len - 1;
1394            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1395            match (chunks.last_mut(), is_thought) {
1396                (Some(AssistantMessageChunk::Message { block }), false)
1397                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1398                    block.append(chunk, &language_registry, path_style, cx)
1399                }
1400                _ => {
1401                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1402                    if is_thought {
1403                        chunks.push(AssistantMessageChunk::Thought { block })
1404                    } else {
1405                        chunks.push(AssistantMessageChunk::Message { block })
1406                    }
1407                }
1408            }
1409        } else {
1410            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1411            let chunk = if is_thought {
1412                AssistantMessageChunk::Thought { block }
1413            } else {
1414                AssistantMessageChunk::Message { block }
1415            };
1416
1417            self.push_entry(
1418                AgentThreadEntry::AssistantMessage(AssistantMessage {
1419                    chunks: vec![chunk],
1420                    indented,
1421                    is_subagent_output: false,
1422                }),
1423                cx,
1424            );
1425        }
1426    }
1427
1428    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1429        self.entries.push(entry);
1430        cx.emit(AcpThreadEvent::NewEntry);
1431    }
1432
1433    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1434        self.connection.set_title(&self.session_id, cx).is_some()
1435    }
1436
1437    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1438        let had_provisional = self.provisional_title.take().is_some();
1439        if title != self.title {
1440            self.title = title.clone();
1441            cx.emit(AcpThreadEvent::TitleUpdated);
1442            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1443                return set_title.run(title, cx);
1444            }
1445        } else if had_provisional {
1446            cx.emit(AcpThreadEvent::TitleUpdated);
1447        }
1448        Task::ready(Ok(()))
1449    }
1450
1451    /// Sets a provisional display title without propagating back to the
1452    /// underlying agent connection. This is used for quick preview titles
1453    /// (e.g. first 20 chars of the user message) that should be shown
1454    /// immediately but replaced once the LLM generates a proper title via
1455    /// `set_title`.
1456    pub fn has_provisional_title(&self) -> bool {
1457        self.provisional_title.is_some()
1458    }
1459
1460    pub fn provisional_title(&self) -> Option<&SharedString> {
1461        self.provisional_title.as_ref()
1462    }
1463
1464    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1465        self.provisional_title = Some(title);
1466        cx.emit(AcpThreadEvent::TitleUpdated);
1467    }
1468
1469    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1470        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1471    }
1472
1473    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1474        self.token_usage = usage;
1475        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1476    }
1477
1478    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1479        cx.emit(AcpThreadEvent::Retry(status));
1480    }
1481
1482    pub fn update_tool_call(
1483        &mut self,
1484        update: impl Into<ToolCallUpdate>,
1485        cx: &mut Context<Self>,
1486    ) -> Result<()> {
1487        let update = update.into();
1488        let languages = self.project.read(cx).languages().clone();
1489        let path_style = self.project.read(cx).path_style(cx);
1490
1491        let ix = match self.index_for_tool_call(update.id()) {
1492            Some(ix) => ix,
1493            None => {
1494                // Tool call not found - create a failed tool call entry
1495                let failed_tool_call = ToolCall {
1496                    id: update.id().clone(),
1497                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1498                    kind: acp::ToolKind::Fetch,
1499                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1500                        "Tool call not found".into(),
1501                        &languages,
1502                        path_style,
1503                        cx,
1504                    ))],
1505                    status: ToolCallStatus::Failed,
1506                    locations: Vec::new(),
1507                    resolved_locations: Vec::new(),
1508                    raw_input: None,
1509                    raw_input_markdown: None,
1510                    raw_output: None,
1511                    tool_name: None,
1512                    subagent_session_info: None,
1513                };
1514                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1515                return Ok(());
1516            }
1517        };
1518        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1519            unreachable!()
1520        };
1521
1522        match update {
1523            ToolCallUpdate::UpdateFields(update) => {
1524                let location_updated = update.fields.locations.is_some();
1525                call.update_fields(
1526                    update.fields,
1527                    update.meta,
1528                    languages,
1529                    path_style,
1530                    &self.terminals,
1531                    cx,
1532                )?;
1533                if location_updated {
1534                    self.resolve_locations(update.tool_call_id, cx);
1535                }
1536            }
1537            ToolCallUpdate::UpdateDiff(update) => {
1538                call.content.clear();
1539                call.content.push(ToolCallContent::Diff(update.diff));
1540            }
1541            ToolCallUpdate::UpdateTerminal(update) => {
1542                call.content.clear();
1543                call.content
1544                    .push(ToolCallContent::Terminal(update.terminal));
1545            }
1546        }
1547
1548        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1549
1550        Ok(())
1551    }
1552
1553    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1554    pub fn upsert_tool_call(
1555        &mut self,
1556        tool_call: acp::ToolCall,
1557        cx: &mut Context<Self>,
1558    ) -> Result<(), acp::Error> {
1559        let status = tool_call.status.into();
1560        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1561    }
1562
1563    /// Fails if id does not match an existing entry.
1564    pub fn upsert_tool_call_inner(
1565        &mut self,
1566        update: acp::ToolCallUpdate,
1567        status: ToolCallStatus,
1568        cx: &mut Context<Self>,
1569    ) -> Result<(), acp::Error> {
1570        let language_registry = self.project.read(cx).languages().clone();
1571        let path_style = self.project.read(cx).path_style(cx);
1572        let id = update.tool_call_id.clone();
1573
1574        let agent_telemetry_id = self.connection().telemetry_id();
1575        let session = self.session_id();
1576        let parent_session_id = self.parent_session_id();
1577        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1578            let status = if matches!(status, ToolCallStatus::Completed) {
1579                "completed"
1580            } else {
1581                "failed"
1582            };
1583            telemetry::event!(
1584                "Agent Tool Call Completed",
1585                agent_telemetry_id,
1586                session,
1587                parent_session_id,
1588                status
1589            );
1590        }
1591
1592        if let Some(ix) = self.index_for_tool_call(&id) {
1593            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1594                unreachable!()
1595            };
1596
1597            call.update_fields(
1598                update.fields,
1599                update.meta,
1600                language_registry,
1601                path_style,
1602                &self.terminals,
1603                cx,
1604            )?;
1605            call.status = status;
1606
1607            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1608        } else {
1609            let call = ToolCall::from_acp(
1610                update.try_into()?,
1611                status,
1612                language_registry,
1613                self.project.read(cx).path_style(cx),
1614                &self.terminals,
1615                cx,
1616            )?;
1617            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1618        };
1619
1620        self.resolve_locations(id, cx);
1621        Ok(())
1622    }
1623
1624    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1625        self.entries
1626            .iter()
1627            .enumerate()
1628            .rev()
1629            .find_map(|(index, entry)| {
1630                if let AgentThreadEntry::ToolCall(tool_call) = entry
1631                    && &tool_call.id == id
1632                {
1633                    Some(index)
1634                } else {
1635                    None
1636                }
1637            })
1638    }
1639
1640    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1641        // The tool call we are looking for is typically the last one, or very close to the end.
1642        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1643        self.entries
1644            .iter_mut()
1645            .enumerate()
1646            .rev()
1647            .find_map(|(index, tool_call)| {
1648                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1649                    && &tool_call.id == id
1650                {
1651                    Some((index, tool_call))
1652                } else {
1653                    None
1654                }
1655            })
1656    }
1657
1658    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1659        self.entries
1660            .iter()
1661            .enumerate()
1662            .rev()
1663            .find_map(|(index, tool_call)| {
1664                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1665                    && &tool_call.id == id
1666                {
1667                    Some((index, tool_call))
1668                } else {
1669                    None
1670                }
1671            })
1672    }
1673
1674    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1675        self.entries.iter().find_map(|entry| match entry {
1676            AgentThreadEntry::ToolCall(tool_call) => {
1677                if let Some(subagent_session_info) = &tool_call.subagent_session_info
1678                    && &subagent_session_info.session_id == session_id
1679                {
1680                    Some(tool_call)
1681                } else {
1682                    None
1683                }
1684            }
1685            _ => None,
1686        })
1687    }
1688
1689    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1690        let project = self.project.clone();
1691        let should_update_agent_location = self.parent_session_id.is_none();
1692        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1693            return;
1694        };
1695        let task = tool_call.resolve_locations(project, cx);
1696        cx.spawn(async move |this, cx| {
1697            let resolved_locations = task.await;
1698
1699            this.update(cx, |this, cx| {
1700                let project = this.project.clone();
1701
1702                for location in resolved_locations.iter().flatten() {
1703                    this.shared_buffers
1704                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1705                }
1706                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1707                    return;
1708                };
1709
1710                if let Some(Some(location)) = resolved_locations.last() {
1711                    project.update(cx, |project, cx| {
1712                        let should_ignore = if let Some(agent_location) = project
1713                            .agent_location()
1714                            .filter(|agent_location| agent_location.buffer == location.buffer)
1715                        {
1716                            let snapshot = location.buffer.read(cx).snapshot();
1717                            let old_position = agent_location.position.to_point(&snapshot);
1718                            let new_position = location.position.to_point(&snapshot);
1719
1720                            // ignore this so that when we get updates from the edit tool
1721                            // the position doesn't reset to the startof line
1722                            old_position.row == new_position.row
1723                                && old_position.column > new_position.column
1724                        } else {
1725                            false
1726                        };
1727                        if !should_ignore && should_update_agent_location {
1728                            project.set_agent_location(Some(location.into()), cx);
1729                        }
1730                    });
1731                }
1732
1733                let resolved_locations = resolved_locations
1734                    .iter()
1735                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1736                    .collect::<Vec<_>>();
1737
1738                if tool_call.resolved_locations != resolved_locations {
1739                    tool_call.resolved_locations = resolved_locations;
1740                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1741                }
1742            })
1743        })
1744        .detach();
1745    }
1746
1747    pub fn request_tool_call_authorization(
1748        &mut self,
1749        tool_call: acp::ToolCallUpdate,
1750        options: PermissionOptions,
1751        cx: &mut Context<Self>,
1752    ) -> Result<Task<acp::RequestPermissionOutcome>> {
1753        let (tx, rx) = oneshot::channel();
1754
1755        let status = ToolCallStatus::WaitingForConfirmation {
1756            options,
1757            respond_tx: tx,
1758        };
1759
1760        let tool_call_id = tool_call.tool_call_id.clone();
1761        self.upsert_tool_call_inner(tool_call, status, cx)?;
1762        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
1763            tool_call_id.clone(),
1764        ));
1765
1766        Ok(cx.spawn(async move |this, cx| {
1767            let outcome = match rx.await {
1768                Ok(option) => acp::RequestPermissionOutcome::Selected(
1769                    acp::SelectedPermissionOutcome::new(option),
1770                ),
1771                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1772            };
1773            this.update(cx, |_this, cx| {
1774                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
1775            })
1776            .ok();
1777            outcome
1778        }))
1779    }
1780
1781    pub fn authorize_tool_call(
1782        &mut self,
1783        id: acp::ToolCallId,
1784        option_id: acp::PermissionOptionId,
1785        option_kind: acp::PermissionOptionKind,
1786        cx: &mut Context<Self>,
1787    ) {
1788        let Some((ix, call)) = self.tool_call_mut(&id) else {
1789            return;
1790        };
1791
1792        let new_status = match option_kind {
1793            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1794                ToolCallStatus::Rejected
1795            }
1796            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1797                ToolCallStatus::InProgress
1798            }
1799            _ => ToolCallStatus::InProgress,
1800        };
1801
1802        let curr_status = mem::replace(&mut call.status, new_status);
1803
1804        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1805            respond_tx.send(option_id).log_err();
1806        } else if cfg!(debug_assertions) {
1807            panic!("tried to authorize an already authorized tool call");
1808        }
1809
1810        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1811    }
1812
1813    pub fn plan(&self) -> &Plan {
1814        &self.plan
1815    }
1816
1817    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1818        let new_entries_len = request.entries.len();
1819        let mut new_entries = request.entries.into_iter();
1820
1821        // Reuse existing markdown to prevent flickering
1822        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1823            let PlanEntry {
1824                content,
1825                priority,
1826                status,
1827            } = old;
1828            content.update(cx, |old, cx| {
1829                old.replace(new.content, cx);
1830            });
1831            *priority = new.priority;
1832            *status = new.status;
1833        }
1834        for new in new_entries {
1835            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1836        }
1837        self.plan.entries.truncate(new_entries_len);
1838
1839        cx.notify();
1840    }
1841
1842    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1843        self.plan
1844            .entries
1845            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1846        cx.notify();
1847    }
1848
1849    #[cfg(any(test, feature = "test-support"))]
1850    pub fn send_raw(
1851        &mut self,
1852        message: &str,
1853        cx: &mut Context<Self>,
1854    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
1855        self.send(vec![message.into()], cx)
1856    }
1857
1858    pub fn send(
1859        &mut self,
1860        message: Vec<acp::ContentBlock>,
1861        cx: &mut Context<Self>,
1862    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
1863        let block = ContentBlock::new_combined(
1864            message.clone(),
1865            self.project.read(cx).languages().clone(),
1866            self.project.read(cx).path_style(cx),
1867            cx,
1868        );
1869        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1870        let git_store = self.project.read(cx).git_store().clone();
1871
1872        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1873            Some(UserMessageId::new())
1874        } else {
1875            None
1876        };
1877
1878        self.run_turn(cx, async move |this, cx| {
1879            this.update(cx, |this, cx| {
1880                this.push_entry(
1881                    AgentThreadEntry::UserMessage(UserMessage {
1882                        id: message_id.clone(),
1883                        content: block,
1884                        chunks: message,
1885                        checkpoint: None,
1886                        indented: false,
1887                    }),
1888                    cx,
1889                );
1890            })
1891            .ok();
1892
1893            let old_checkpoint = git_store
1894                .update(cx, |git, cx| git.checkpoint(cx))
1895                .await
1896                .context("failed to get old checkpoint")
1897                .log_err();
1898            this.update(cx, |this, cx| {
1899                if let Some((_ix, message)) = this.last_user_message() {
1900                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1901                        git_checkpoint,
1902                        show: false,
1903                    });
1904                }
1905                this.connection.prompt(message_id, request, cx)
1906            })?
1907            .await
1908        })
1909    }
1910
1911    pub fn can_retry(&self, cx: &App) -> bool {
1912        self.connection.retry(&self.session_id, cx).is_some()
1913    }
1914
1915    pub fn retry(
1916        &mut self,
1917        cx: &mut Context<Self>,
1918    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
1919        self.run_turn(cx, async move |this, cx| {
1920            this.update(cx, |this, cx| {
1921                this.connection
1922                    .retry(&this.session_id, cx)
1923                    .map(|retry| retry.run(cx))
1924            })?
1925            .context("retrying a session is not supported")?
1926            .await
1927        })
1928    }
1929
1930    fn run_turn(
1931        &mut self,
1932        cx: &mut Context<Self>,
1933        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1934    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
1935        self.clear_completed_plan_entries(cx);
1936        self.had_error = false;
1937
1938        let (tx, rx) = oneshot::channel();
1939        let cancel_task = self.cancel(cx);
1940
1941        self.turn_id += 1;
1942        let turn_id = self.turn_id;
1943        self.running_turn = Some(RunningTurn {
1944            id: turn_id,
1945            send_task: cx.spawn(async move |this, cx| {
1946                cancel_task.await;
1947                tx.send(f(this, cx).await).ok();
1948            }),
1949        });
1950
1951        cx.spawn(async move |this, cx| {
1952            let response = rx.await;
1953
1954            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1955                .await?;
1956
1957            this.update(cx, |this, cx| {
1958                if this.parent_session_id.is_none() {
1959                    this.project
1960                        .update(cx, |project, cx| project.set_agent_location(None, cx));
1961                }
1962                let Ok(response) = response else {
1963                    // tx dropped, just return
1964                    return Ok(None);
1965                };
1966
1967                let is_same_turn = this
1968                    .running_turn
1969                    .as_ref()
1970                    .is_some_and(|turn| turn_id == turn.id);
1971
1972                // If the user submitted a follow up message, running_turn might
1973                // already point to a different turn. Therefore we only want to
1974                // take the task if it's the same turn.
1975                if is_same_turn {
1976                    this.running_turn.take();
1977                }
1978
1979                match response {
1980                    Ok(r) => {
1981                        if r.stop_reason == acp::StopReason::MaxTokens {
1982                            this.had_error = true;
1983                            cx.emit(AcpThreadEvent::Error);
1984                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
1985                            return Err(anyhow!("Max tokens reached"));
1986                        }
1987
1988                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
1989                        if canceled {
1990                            this.mark_pending_tools_as_canceled();
1991                        }
1992
1993                        // Handle refusal - distinguish between user prompt and tool call refusals
1994                        if let acp::StopReason::Refusal = r.stop_reason {
1995                            this.had_error = true;
1996                            if let Some((user_msg_ix, _)) = this.last_user_message() {
1997                                // Check if there's a completed tool call with results after the last user message
1998                                // This indicates the refusal is in response to tool output, not the user's prompt
1999                                let has_completed_tool_call_after_user_msg =
2000                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2001                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2002                                            // Check if the tool call has completed and has output
2003                                            matches!(tool_call.status, ToolCallStatus::Completed)
2004                                                && tool_call.raw_output.is_some()
2005                                        } else {
2006                                            false
2007                                        }
2008                                    });
2009
2010                                if has_completed_tool_call_after_user_msg {
2011                                    // Refusal is due to tool output - don't truncate, just notify
2012                                    // The model refused based on what the tool returned
2013                                    cx.emit(AcpThreadEvent::Refusal);
2014                                } else {
2015                                    // User prompt was refused - truncate back to before the user message
2016                                    let range = user_msg_ix..this.entries.len();
2017                                    if range.start < range.end {
2018                                        this.entries.truncate(user_msg_ix);
2019                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2020                                    }
2021                                    cx.emit(AcpThreadEvent::Refusal);
2022                                }
2023                            } else {
2024                                // No user message found, treat as general refusal
2025                                cx.emit(AcpThreadEvent::Refusal);
2026                            }
2027                        }
2028
2029                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2030                        Ok(Some(r))
2031                    }
2032                    Err(e) => {
2033                        this.had_error = true;
2034                        cx.emit(AcpThreadEvent::Error);
2035                        log::error!("Error in run turn: {:?}", e);
2036                        Err(e)
2037                    }
2038                }
2039            })?
2040        })
2041        .boxed()
2042    }
2043
2044    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2045        let Some(turn) = self.running_turn.take() else {
2046            return Task::ready(());
2047        };
2048        self.connection.cancel(&self.session_id, cx);
2049
2050        self.mark_pending_tools_as_canceled();
2051
2052        // Wait for the send task to complete
2053        cx.background_spawn(turn.send_task)
2054    }
2055
2056    fn mark_pending_tools_as_canceled(&mut self) {
2057        for entry in self.entries.iter_mut() {
2058            if let AgentThreadEntry::ToolCall(call) = entry {
2059                let cancel = matches!(
2060                    call.status,
2061                    ToolCallStatus::Pending
2062                        | ToolCallStatus::WaitingForConfirmation { .. }
2063                        | ToolCallStatus::InProgress
2064                );
2065
2066                if cancel {
2067                    call.status = ToolCallStatus::Canceled;
2068                }
2069            }
2070        }
2071    }
2072
2073    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2074    pub fn restore_checkpoint(
2075        &mut self,
2076        id: UserMessageId,
2077        cx: &mut Context<Self>,
2078    ) -> Task<Result<()>> {
2079        let Some((_, message)) = self.user_message_mut(&id) else {
2080            return Task::ready(Err(anyhow!("message not found")));
2081        };
2082
2083        let checkpoint = message
2084            .checkpoint
2085            .as_ref()
2086            .map(|c| c.git_checkpoint.clone());
2087
2088        // Cancel any in-progress generation before restoring
2089        let cancel_task = self.cancel(cx);
2090        let rewind = self.rewind(id.clone(), cx);
2091        let git_store = self.project.read(cx).git_store().clone();
2092
2093        cx.spawn(async move |_, cx| {
2094            cancel_task.await;
2095            rewind.await?;
2096            if let Some(checkpoint) = checkpoint {
2097                git_store
2098                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2099                    .await?;
2100            }
2101
2102            Ok(())
2103        })
2104    }
2105
2106    /// Rewinds this thread to before the entry at `index`, removing it and all
2107    /// subsequent entries while rejecting any action_log changes made from that point.
2108    /// Unlike `restore_checkpoint`, this method does not restore from git.
2109    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2110        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2111            return Task::ready(Err(anyhow!("not supported")));
2112        };
2113
2114        let telemetry = ActionLogTelemetry::from(&*self);
2115        cx.spawn(async move |this, cx| {
2116            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2117            this.update(cx, |this, cx| {
2118                if let Some((ix, _)) = this.user_message_mut(&id) {
2119                    // Collect all terminals from entries that will be removed
2120                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2121                        .iter()
2122                        .flat_map(|entry| entry.terminals())
2123                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2124                        .collect();
2125
2126                    let range = ix..this.entries.len();
2127                    this.entries.truncate(ix);
2128                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2129
2130                    // Kill and remove the terminals
2131                    for terminal_id in terminals_to_remove {
2132                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2133                            terminal.update(cx, |terminal, cx| {
2134                                terminal.kill(cx);
2135                            });
2136                        }
2137                    }
2138                }
2139                this.action_log().update(cx, |action_log, cx| {
2140                    action_log.reject_all_edits(Some(telemetry), cx)
2141                })
2142            })?
2143            .await;
2144            Ok(())
2145        })
2146    }
2147
2148    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2149        let git_store = self.project.read(cx).git_store().clone();
2150
2151        let Some((_, message)) = self.last_user_message() else {
2152            return Task::ready(Ok(()));
2153        };
2154        let Some(user_message_id) = message.id.clone() else {
2155            return Task::ready(Ok(()));
2156        };
2157        let Some(checkpoint) = message.checkpoint.as_ref() else {
2158            return Task::ready(Ok(()));
2159        };
2160        let old_checkpoint = checkpoint.git_checkpoint.clone();
2161
2162        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2163        cx.spawn(async move |this, cx| {
2164            let Some(new_checkpoint) = new_checkpoint
2165                .await
2166                .context("failed to get new checkpoint")
2167                .log_err()
2168            else {
2169                return Ok(());
2170            };
2171
2172            let equal = git_store
2173                .update(cx, |git, cx| {
2174                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2175                })
2176                .await
2177                .unwrap_or(true);
2178
2179            this.update(cx, |this, cx| {
2180                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2181                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2182                        checkpoint.show = !equal;
2183                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2184                    }
2185                }
2186            })?;
2187
2188            Ok(())
2189        })
2190    }
2191
2192    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2193        self.entries
2194            .iter_mut()
2195            .enumerate()
2196            .rev()
2197            .find_map(|(ix, entry)| {
2198                if let AgentThreadEntry::UserMessage(message) = entry {
2199                    Some((ix, message))
2200                } else {
2201                    None
2202                }
2203            })
2204    }
2205
2206    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2207        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2208            if let AgentThreadEntry::UserMessage(message) = entry {
2209                if message.id.as_ref() == Some(id) {
2210                    Some((ix, message))
2211                } else {
2212                    None
2213                }
2214            } else {
2215                None
2216            }
2217        })
2218    }
2219
2220    pub fn read_text_file(
2221        &self,
2222        path: PathBuf,
2223        line: Option<u32>,
2224        limit: Option<u32>,
2225        reuse_shared_snapshot: bool,
2226        cx: &mut Context<Self>,
2227    ) -> Task<Result<String, acp::Error>> {
2228        // Args are 1-based, move to 0-based
2229        let line = line.unwrap_or_default().saturating_sub(1);
2230        let limit = limit.unwrap_or(u32::MAX);
2231        let project = self.project.clone();
2232        let action_log = self.action_log.clone();
2233        let should_update_agent_location = self.parent_session_id.is_none();
2234        cx.spawn(async move |this, cx| {
2235            let load = project.update(cx, |project, cx| {
2236                let path = project
2237                    .project_path_for_absolute_path(&path, cx)
2238                    .ok_or_else(|| {
2239                        acp::Error::resource_not_found(Some(path.display().to_string()))
2240                    })?;
2241                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2242            })?;
2243
2244            let buffer = load.await?;
2245
2246            let snapshot = if reuse_shared_snapshot {
2247                this.read_with(cx, |this, _| {
2248                    this.shared_buffers.get(&buffer.clone()).cloned()
2249                })
2250                .log_err()
2251                .flatten()
2252            } else {
2253                None
2254            };
2255
2256            let snapshot = if let Some(snapshot) = snapshot {
2257                snapshot
2258            } else {
2259                action_log.update(cx, |action_log, cx| {
2260                    action_log.buffer_read(buffer.clone(), cx);
2261                });
2262
2263                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2264                this.update(cx, |this, _| {
2265                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2266                })?;
2267                snapshot
2268            };
2269
2270            let max_point = snapshot.max_point();
2271            let start_position = Point::new(line, 0);
2272
2273            if start_position > max_point {
2274                return Err(acp::Error::invalid_params().data(format!(
2275                    "Attempting to read beyond the end of the file, line {}:{}",
2276                    max_point.row + 1,
2277                    max_point.column
2278                )));
2279            }
2280
2281            let start = snapshot.anchor_before(start_position);
2282            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2283
2284            if should_update_agent_location {
2285                project.update(cx, |project, cx| {
2286                    project.set_agent_location(
2287                        Some(AgentLocation {
2288                            buffer: buffer.downgrade(),
2289                            position: start,
2290                        }),
2291                        cx,
2292                    );
2293                });
2294            }
2295
2296            Ok(snapshot.text_for_range(start..end).collect::<String>())
2297        })
2298    }
2299
2300    pub fn write_text_file(
2301        &self,
2302        path: PathBuf,
2303        content: String,
2304        cx: &mut Context<Self>,
2305    ) -> Task<Result<()>> {
2306        let project = self.project.clone();
2307        let action_log = self.action_log.clone();
2308        let should_update_agent_location = self.parent_session_id.is_none();
2309        cx.spawn(async move |this, cx| {
2310            let load = project.update(cx, |project, cx| {
2311                let path = project
2312                    .project_path_for_absolute_path(&path, cx)
2313                    .context("invalid path")?;
2314                anyhow::Ok(project.open_buffer(path, cx))
2315            });
2316            let buffer = load?.await?;
2317            let snapshot = this.update(cx, |this, cx| {
2318                this.shared_buffers
2319                    .get(&buffer)
2320                    .cloned()
2321                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2322            })?;
2323            let edits = cx
2324                .background_executor()
2325                .spawn(async move {
2326                    let old_text = snapshot.text();
2327                    text_diff(old_text.as_str(), &content)
2328                        .into_iter()
2329                        .map(|(range, replacement)| {
2330                            (snapshot.anchor_range_around(range), replacement)
2331                        })
2332                        .collect::<Vec<_>>()
2333                })
2334                .await;
2335
2336            if should_update_agent_location {
2337                project.update(cx, |project, cx| {
2338                    project.set_agent_location(
2339                        Some(AgentLocation {
2340                            buffer: buffer.downgrade(),
2341                            position: edits
2342                                .last()
2343                                .map(|(range, _)| range.end)
2344                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2345                        }),
2346                        cx,
2347                    );
2348                });
2349            }
2350
2351            let format_on_save = cx.update(|cx| {
2352                action_log.update(cx, |action_log, cx| {
2353                    action_log.buffer_read(buffer.clone(), cx);
2354                });
2355
2356                let format_on_save = buffer.update(cx, |buffer, cx| {
2357                    buffer.edit(edits, None, cx);
2358
2359                    let settings = language::language_settings::language_settings(
2360                        buffer.language().map(|l| l.name()),
2361                        buffer.file(),
2362                        cx,
2363                    );
2364
2365                    settings.format_on_save != FormatOnSave::Off
2366                });
2367                action_log.update(cx, |action_log, cx| {
2368                    action_log.buffer_edited(buffer.clone(), cx);
2369                });
2370                format_on_save
2371            });
2372
2373            if format_on_save {
2374                let format_task = project.update(cx, |project, cx| {
2375                    project.format(
2376                        HashSet::from_iter([buffer.clone()]),
2377                        LspFormatTarget::Buffers,
2378                        false,
2379                        FormatTrigger::Save,
2380                        cx,
2381                    )
2382                });
2383                format_task.await.log_err();
2384
2385                action_log.update(cx, |action_log, cx| {
2386                    action_log.buffer_edited(buffer.clone(), cx);
2387                });
2388            }
2389
2390            project
2391                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2392                .await
2393        })
2394    }
2395
2396    pub fn create_terminal(
2397        &self,
2398        command: String,
2399        args: Vec<String>,
2400        extra_env: Vec<acp::EnvVariable>,
2401        cwd: Option<PathBuf>,
2402        output_byte_limit: Option<u64>,
2403        cx: &mut Context<Self>,
2404    ) -> Task<Result<Entity<Terminal>>> {
2405        let env = match &cwd {
2406            Some(dir) => self.project.update(cx, |project, cx| {
2407                project.environment().update(cx, |env, cx| {
2408                    env.directory_environment(dir.as_path().into(), cx)
2409                })
2410            }),
2411            None => Task::ready(None).shared(),
2412        };
2413        let env = cx.spawn(async move |_, _| {
2414            let mut env = env.await.unwrap_or_default();
2415            // Disables paging for `git` and hopefully other commands
2416            env.insert("PAGER".into(), "".into());
2417            for var in extra_env {
2418                env.insert(var.name, var.value);
2419            }
2420            env
2421        });
2422
2423        let project = self.project.clone();
2424        let language_registry = project.read(cx).languages().clone();
2425        let is_windows = project.read(cx).path_style(cx).is_windows();
2426
2427        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2428        let terminal_task = cx.spawn({
2429            let terminal_id = terminal_id.clone();
2430            async move |_this, cx| {
2431                let env = env.await;
2432                let shell = project
2433                    .update(cx, |project, cx| {
2434                        project
2435                            .remote_client()
2436                            .and_then(|r| r.read(cx).default_system_shell())
2437                    })
2438                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2439                let (task_command, task_args) =
2440                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2441                        .redirect_stdin_to_dev_null()
2442                        .build(Some(command.clone()), &args);
2443                let terminal = project
2444                    .update(cx, |project, cx| {
2445                        project.create_terminal_task(
2446                            task::SpawnInTerminal {
2447                                command: Some(task_command),
2448                                args: task_args,
2449                                cwd: cwd.clone(),
2450                                env,
2451                                ..Default::default()
2452                            },
2453                            cx,
2454                        )
2455                    })
2456                    .await?;
2457
2458                anyhow::Ok(cx.new(|cx| {
2459                    Terminal::new(
2460                        terminal_id,
2461                        &format!("{} {}", command, args.join(" ")),
2462                        cwd,
2463                        output_byte_limit.map(|l| l as usize),
2464                        terminal,
2465                        language_registry,
2466                        cx,
2467                    )
2468                }))
2469            }
2470        });
2471
2472        cx.spawn(async move |this, cx| {
2473            let terminal = terminal_task.await?;
2474            this.update(cx, |this, _cx| {
2475                this.terminals.insert(terminal_id, terminal.clone());
2476                terminal
2477            })
2478        })
2479    }
2480
2481    pub fn kill_terminal(
2482        &mut self,
2483        terminal_id: acp::TerminalId,
2484        cx: &mut Context<Self>,
2485    ) -> Result<()> {
2486        self.terminals
2487            .get(&terminal_id)
2488            .context("Terminal not found")?
2489            .update(cx, |terminal, cx| {
2490                terminal.kill(cx);
2491            });
2492
2493        Ok(())
2494    }
2495
2496    pub fn release_terminal(
2497        &mut self,
2498        terminal_id: acp::TerminalId,
2499        cx: &mut Context<Self>,
2500    ) -> Result<()> {
2501        self.terminals
2502            .remove(&terminal_id)
2503            .context("Terminal not found")?
2504            .update(cx, |terminal, cx| {
2505                terminal.kill(cx);
2506            });
2507
2508        Ok(())
2509    }
2510
2511    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2512        self.terminals
2513            .get(&terminal_id)
2514            .context("Terminal not found")
2515            .cloned()
2516    }
2517
2518    pub fn to_markdown(&self, cx: &App) -> String {
2519        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2520    }
2521
2522    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2523        cx.emit(AcpThreadEvent::LoadError(error));
2524    }
2525
2526    pub fn register_terminal_created(
2527        &mut self,
2528        terminal_id: acp::TerminalId,
2529        command_label: String,
2530        working_dir: Option<PathBuf>,
2531        output_byte_limit: Option<u64>,
2532        terminal: Entity<::terminal::Terminal>,
2533        cx: &mut Context<Self>,
2534    ) -> Entity<Terminal> {
2535        let language_registry = self.project.read(cx).languages().clone();
2536
2537        let entity = cx.new(|cx| {
2538            Terminal::new(
2539                terminal_id.clone(),
2540                &command_label,
2541                working_dir.clone(),
2542                output_byte_limit.map(|l| l as usize),
2543                terminal,
2544                language_registry,
2545                cx,
2546            )
2547        });
2548        self.terminals.insert(terminal_id.clone(), entity.clone());
2549        entity
2550    }
2551
2552    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2553        for entry in self.entries.iter_mut().rev() {
2554            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2555                assistant_message.is_subagent_output = true;
2556                cx.notify();
2557                return;
2558            }
2559        }
2560    }
2561
2562    pub fn on_terminal_provider_event(
2563        &mut self,
2564        event: TerminalProviderEvent,
2565        cx: &mut Context<Self>,
2566    ) {
2567        match event {
2568            TerminalProviderEvent::Created {
2569                terminal_id,
2570                label,
2571                cwd,
2572                output_byte_limit,
2573                terminal,
2574            } => {
2575                let entity = self.register_terminal_created(
2576                    terminal_id.clone(),
2577                    label,
2578                    cwd,
2579                    output_byte_limit,
2580                    terminal,
2581                    cx,
2582                );
2583
2584                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2585                    for data in chunks.drain(..) {
2586                        entity.update(cx, |term, cx| {
2587                            term.inner().update(cx, |inner, cx| {
2588                                inner.write_output(&data, cx);
2589                            })
2590                        });
2591                    }
2592                }
2593
2594                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2595                    entity.update(cx, |_term, cx| {
2596                        cx.notify();
2597                    });
2598                }
2599
2600                cx.notify();
2601            }
2602            TerminalProviderEvent::Output { terminal_id, data } => {
2603                if let Some(entity) = self.terminals.get(&terminal_id) {
2604                    entity.update(cx, |term, cx| {
2605                        term.inner().update(cx, |inner, cx| {
2606                            inner.write_output(&data, cx);
2607                        })
2608                    });
2609                } else {
2610                    self.pending_terminal_output
2611                        .entry(terminal_id)
2612                        .or_default()
2613                        .push(data);
2614                }
2615            }
2616            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2617                if let Some(entity) = self.terminals.get(&terminal_id) {
2618                    entity.update(cx, |term, cx| {
2619                        term.inner().update(cx, |inner, cx| {
2620                            inner.breadcrumb_text = title;
2621                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2622                        })
2623                    });
2624                }
2625            }
2626            TerminalProviderEvent::Exit {
2627                terminal_id,
2628                status,
2629            } => {
2630                if let Some(entity) = self.terminals.get(&terminal_id) {
2631                    entity.update(cx, |_term, cx| {
2632                        cx.notify();
2633                    });
2634                } else {
2635                    self.pending_terminal_exit.insert(terminal_id, status);
2636                }
2637            }
2638        }
2639    }
2640}
2641
2642fn markdown_for_raw_output(
2643    raw_output: &serde_json::Value,
2644    language_registry: &Arc<LanguageRegistry>,
2645    cx: &mut App,
2646) -> Option<Entity<Markdown>> {
2647    match raw_output {
2648        serde_json::Value::Null => None,
2649        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2650            Markdown::new(
2651                value.to_string().into(),
2652                Some(language_registry.clone()),
2653                None,
2654                cx,
2655            )
2656        })),
2657        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2658            Markdown::new(
2659                value.to_string().into(),
2660                Some(language_registry.clone()),
2661                None,
2662                cx,
2663            )
2664        })),
2665        serde_json::Value::String(value) => Some(cx.new(|cx| {
2666            Markdown::new(
2667                value.clone().into(),
2668                Some(language_registry.clone()),
2669                None,
2670                cx,
2671            )
2672        })),
2673        value => Some(cx.new(|cx| {
2674            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2675
2676            Markdown::new(
2677                format!("```json\n{}\n```", pretty_json).into(),
2678                Some(language_registry.clone()),
2679                None,
2680                cx,
2681            )
2682        })),
2683    }
2684}
2685
2686#[cfg(test)]
2687mod tests {
2688    use super::*;
2689    use anyhow::anyhow;
2690    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2691    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2692    use indoc::indoc;
2693    use project::{FakeFs, Fs};
2694    use rand::{distr, prelude::*};
2695    use serde_json::json;
2696    use settings::SettingsStore;
2697    use smol::stream::StreamExt as _;
2698    use std::{
2699        any::Any,
2700        cell::RefCell,
2701        path::Path,
2702        rc::Rc,
2703        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2704        time::Duration,
2705    };
2706    use util::path;
2707
2708    fn init_test(cx: &mut TestAppContext) {
2709        env_logger::try_init().ok();
2710        cx.update(|cx| {
2711            let settings_store = SettingsStore::test(cx);
2712            cx.set_global(settings_store);
2713        });
2714    }
2715
2716    #[gpui::test]
2717    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2718        init_test(cx);
2719
2720        let fs = FakeFs::new(cx.executor());
2721        let project = Project::test(fs, [], cx).await;
2722        let connection = Rc::new(FakeAgentConnection::new());
2723        let thread = cx
2724            .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2725            .await
2726            .unwrap();
2727
2728        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2729
2730        // Send Output BEFORE Created - should be buffered by acp_thread
2731        thread.update(cx, |thread, cx| {
2732            thread.on_terminal_provider_event(
2733                TerminalProviderEvent::Output {
2734                    terminal_id: terminal_id.clone(),
2735                    data: b"hello buffered".to_vec(),
2736                },
2737                cx,
2738            );
2739        });
2740
2741        // Create a display-only terminal and then send Created
2742        let lower = cx.new(|cx| {
2743            let builder = ::terminal::TerminalBuilder::new_display_only(
2744                ::terminal::terminal_settings::CursorShape::default(),
2745                ::terminal::terminal_settings::AlternateScroll::On,
2746                None,
2747                0,
2748                cx.background_executor(),
2749                PathStyle::local(),
2750            )
2751            .unwrap();
2752            builder.subscribe(cx)
2753        });
2754
2755        thread.update(cx, |thread, cx| {
2756            thread.on_terminal_provider_event(
2757                TerminalProviderEvent::Created {
2758                    terminal_id: terminal_id.clone(),
2759                    label: "Buffered Test".to_string(),
2760                    cwd: None,
2761                    output_byte_limit: None,
2762                    terminal: lower.clone(),
2763                },
2764                cx,
2765            );
2766        });
2767
2768        // After Created, buffered Output should have been flushed into the renderer
2769        let content = thread.read_with(cx, |thread, cx| {
2770            let term = thread.terminal(terminal_id.clone()).unwrap();
2771            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2772        });
2773
2774        assert!(
2775            content.contains("hello buffered"),
2776            "expected buffered output to render, got: {content}"
2777        );
2778    }
2779
2780    #[gpui::test]
2781    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2782        init_test(cx);
2783
2784        let fs = FakeFs::new(cx.executor());
2785        let project = Project::test(fs, [], cx).await;
2786        let connection = Rc::new(FakeAgentConnection::new());
2787        let thread = cx
2788            .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2789            .await
2790            .unwrap();
2791
2792        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2793
2794        // Send Output BEFORE Created
2795        thread.update(cx, |thread, cx| {
2796            thread.on_terminal_provider_event(
2797                TerminalProviderEvent::Output {
2798                    terminal_id: terminal_id.clone(),
2799                    data: b"pre-exit data".to_vec(),
2800                },
2801                cx,
2802            );
2803        });
2804
2805        // Send Exit BEFORE Created
2806        thread.update(cx, |thread, cx| {
2807            thread.on_terminal_provider_event(
2808                TerminalProviderEvent::Exit {
2809                    terminal_id: terminal_id.clone(),
2810                    status: acp::TerminalExitStatus::new().exit_code(0),
2811                },
2812                cx,
2813            );
2814        });
2815
2816        // Now create a display-only lower-level terminal and send Created
2817        let lower = cx.new(|cx| {
2818            let builder = ::terminal::TerminalBuilder::new_display_only(
2819                ::terminal::terminal_settings::CursorShape::default(),
2820                ::terminal::terminal_settings::AlternateScroll::On,
2821                None,
2822                0,
2823                cx.background_executor(),
2824                PathStyle::local(),
2825            )
2826            .unwrap();
2827            builder.subscribe(cx)
2828        });
2829
2830        thread.update(cx, |thread, cx| {
2831            thread.on_terminal_provider_event(
2832                TerminalProviderEvent::Created {
2833                    terminal_id: terminal_id.clone(),
2834                    label: "Buffered Exit Test".to_string(),
2835                    cwd: None,
2836                    output_byte_limit: None,
2837                    terminal: lower.clone(),
2838                },
2839                cx,
2840            );
2841        });
2842
2843        // Output should be present after Created (flushed from buffer)
2844        let content = thread.read_with(cx, |thread, cx| {
2845            let term = thread.terminal(terminal_id.clone()).unwrap();
2846            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2847        });
2848
2849        assert!(
2850            content.contains("pre-exit data"),
2851            "expected pre-exit data to render, got: {content}"
2852        );
2853    }
2854
2855    /// Test that killing a terminal via Terminal::kill properly:
2856    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
2857    /// 2. The underlying terminal still has the output that was written before the kill
2858    ///
2859    /// This test verifies that the fix to kill_active_task (which now also kills
2860    /// the shell process in addition to the foreground process) properly allows
2861    /// wait_for_exit to complete instead of hanging indefinitely.
2862    #[cfg(unix)]
2863    #[gpui::test]
2864    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
2865        use std::collections::HashMap;
2866        use task::Shell;
2867        use util::shell_builder::ShellBuilder;
2868
2869        init_test(cx);
2870        cx.executor().allow_parking();
2871
2872        let fs = FakeFs::new(cx.executor());
2873        let project = Project::test(fs, [], cx).await;
2874        let connection = Rc::new(FakeAgentConnection::new());
2875        let thread = cx
2876            .update(|cx| connection.new_session(project.clone(), Path::new(path!("/test")), cx))
2877            .await
2878            .unwrap();
2879
2880        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2881
2882        // Create a real PTY terminal that runs a command which prints output then sleeps
2883        // We use printf instead of echo and chain with && sleep to ensure proper execution
2884        let (completion_tx, _completion_rx) = smol::channel::unbounded();
2885        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
2886            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
2887            &[],
2888        );
2889
2890        let builder = cx
2891            .update(|cx| {
2892                ::terminal::TerminalBuilder::new(
2893                    None,
2894                    None,
2895                    task::Shell::WithArguments {
2896                        program,
2897                        args,
2898                        title_override: None,
2899                    },
2900                    HashMap::default(),
2901                    ::terminal::terminal_settings::CursorShape::default(),
2902                    ::terminal::terminal_settings::AlternateScroll::On,
2903                    None,
2904                    vec![],
2905                    0,
2906                    false,
2907                    0,
2908                    Some(completion_tx),
2909                    cx,
2910                    vec![],
2911                    PathStyle::local(),
2912                )
2913            })
2914            .await
2915            .unwrap();
2916
2917        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
2918
2919        // Create the acp_thread Terminal wrapper
2920        thread.update(cx, |thread, cx| {
2921            thread.on_terminal_provider_event(
2922                TerminalProviderEvent::Created {
2923                    terminal_id: terminal_id.clone(),
2924                    label: "printf output_before_kill && sleep 60".to_string(),
2925                    cwd: None,
2926                    output_byte_limit: None,
2927                    terminal: lower_terminal.clone(),
2928                },
2929                cx,
2930            );
2931        });
2932
2933        // Wait for the printf command to execute and produce output
2934        // Use real time since parking is enabled
2935        cx.executor().timer(Duration::from_millis(500)).await;
2936
2937        // Get the acp_thread Terminal and kill it
2938        let wait_for_exit = thread.update(cx, |thread, cx| {
2939            let term = thread.terminals.get(&terminal_id).unwrap();
2940            let wait_for_exit = term.read(cx).wait_for_exit();
2941            term.update(cx, |term, cx| {
2942                term.kill(cx);
2943            });
2944            wait_for_exit
2945        });
2946
2947        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
2948        // Before the fix to kill_active_task, this would hang forever because
2949        // only the foreground process was killed, not the shell, so the PTY
2950        // child never exited and wait_for_completed_task never completed.
2951        let exit_result = futures::select! {
2952            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
2953            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
2954        };
2955
2956        assert!(
2957            exit_result.is_some(),
2958            "wait_for_exit should complete after kill, but it timed out. \
2959            This indicates kill_active_task is not properly killing the shell process."
2960        );
2961
2962        // Give the system a chance to process any pending updates
2963        cx.run_until_parked();
2964
2965        // Verify that the underlying terminal still has the output that was
2966        // written before the kill. This verifies that killing doesn't lose output.
2967        let inner_content = thread.read_with(cx, |thread, cx| {
2968            let term = thread.terminals.get(&terminal_id).unwrap();
2969            term.read(cx).inner().read(cx).get_content()
2970        });
2971
2972        assert!(
2973            inner_content.contains("output_before_kill"),
2974            "Underlying terminal should contain output from before kill, got: {}",
2975            inner_content
2976        );
2977    }
2978
2979    #[gpui::test]
2980    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2981        init_test(cx);
2982
2983        let fs = FakeFs::new(cx.executor());
2984        let project = Project::test(fs, [], cx).await;
2985        let connection = Rc::new(FakeAgentConnection::new());
2986        let thread = cx
2987            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
2988            .await
2989            .unwrap();
2990
2991        // Test creating a new user message
2992        thread.update(cx, |thread, cx| {
2993            thread.push_user_content_block(None, "Hello, ".into(), cx);
2994        });
2995
2996        thread.update(cx, |thread, cx| {
2997            assert_eq!(thread.entries.len(), 1);
2998            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2999                assert_eq!(user_msg.id, None);
3000                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3001            } else {
3002                panic!("Expected UserMessage");
3003            }
3004        });
3005
3006        // Test appending to existing user message
3007        let message_1_id = UserMessageId::new();
3008        thread.update(cx, |thread, cx| {
3009            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3010        });
3011
3012        thread.update(cx, |thread, cx| {
3013            assert_eq!(thread.entries.len(), 1);
3014            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3015                assert_eq!(user_msg.id, Some(message_1_id));
3016                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3017            } else {
3018                panic!("Expected UserMessage");
3019            }
3020        });
3021
3022        // Test creating new user message after assistant message
3023        thread.update(cx, |thread, cx| {
3024            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3025        });
3026
3027        let message_2_id = UserMessageId::new();
3028        thread.update(cx, |thread, cx| {
3029            thread.push_user_content_block(
3030                Some(message_2_id.clone()),
3031                "New user message".into(),
3032                cx,
3033            );
3034        });
3035
3036        thread.update(cx, |thread, cx| {
3037            assert_eq!(thread.entries.len(), 3);
3038            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3039                assert_eq!(user_msg.id, Some(message_2_id));
3040                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3041            } else {
3042                panic!("Expected UserMessage at index 2");
3043            }
3044        });
3045    }
3046
3047    #[gpui::test]
3048    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3049        init_test(cx);
3050
3051        let fs = FakeFs::new(cx.executor());
3052        let project = Project::test(fs, [], cx).await;
3053        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3054            |_, thread, mut cx| {
3055                async move {
3056                    thread.update(&mut cx, |thread, cx| {
3057                        thread
3058                            .handle_session_update(
3059                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3060                                    "Thinking ".into(),
3061                                )),
3062                                cx,
3063                            )
3064                            .unwrap();
3065                        thread
3066                            .handle_session_update(
3067                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3068                                    "hard!".into(),
3069                                )),
3070                                cx,
3071                            )
3072                            .unwrap();
3073                    })?;
3074                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3075                }
3076                .boxed_local()
3077            },
3078        ));
3079
3080        let thread = cx
3081            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3082            .await
3083            .unwrap();
3084
3085        thread
3086            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3087            .await
3088            .unwrap();
3089
3090        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3091        assert_eq!(
3092            output,
3093            indoc! {r#"
3094            ## User
3095
3096            Hello from Zed!
3097
3098            ## Assistant
3099
3100            <thinking>
3101            Thinking hard!
3102            </thinking>
3103
3104            "#}
3105        );
3106    }
3107
3108    #[gpui::test]
3109    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3110        init_test(cx);
3111
3112        let fs = FakeFs::new(cx.executor());
3113        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3114            .await;
3115        let project = Project::test(fs.clone(), [], cx).await;
3116        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3117        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3118        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3119            move |_, thread, mut cx| {
3120                let read_file_tx = read_file_tx.clone();
3121                async move {
3122                    let content = thread
3123                        .update(&mut cx, |thread, cx| {
3124                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3125                        })
3126                        .unwrap()
3127                        .await
3128                        .unwrap();
3129                    assert_eq!(content, "one\ntwo\nthree\n");
3130                    read_file_tx.take().unwrap().send(()).unwrap();
3131                    thread
3132                        .update(&mut cx, |thread, cx| {
3133                            thread.write_text_file(
3134                                path!("/tmp/foo").into(),
3135                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3136                                cx,
3137                            )
3138                        })
3139                        .unwrap()
3140                        .await
3141                        .unwrap();
3142                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3143                }
3144                .boxed_local()
3145            },
3146        ));
3147
3148        let (worktree, pathbuf) = project
3149            .update(cx, |project, cx| {
3150                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3151            })
3152            .await
3153            .unwrap();
3154        let buffer = project
3155            .update(cx, |project, cx| {
3156                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3157            })
3158            .await
3159            .unwrap();
3160
3161        let thread = cx
3162            .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3163            .await
3164            .unwrap();
3165
3166        let request = thread.update(cx, |thread, cx| {
3167            thread.send_raw("Extend the count in /tmp/foo", cx)
3168        });
3169        read_file_rx.await.ok();
3170        buffer.update(cx, |buffer, cx| {
3171            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3172        });
3173        cx.run_until_parked();
3174        assert_eq!(
3175            buffer.read_with(cx, |buffer, _| buffer.text()),
3176            "zero\none\ntwo\nthree\nfour\nfive\n"
3177        );
3178        assert_eq!(
3179            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3180            "zero\none\ntwo\nthree\nfour\nfive\n"
3181        );
3182        request.await.unwrap();
3183    }
3184
3185    #[gpui::test]
3186    async fn test_reading_from_line(cx: &mut TestAppContext) {
3187        init_test(cx);
3188
3189        let fs = FakeFs::new(cx.executor());
3190        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3191            .await;
3192        let project = Project::test(fs.clone(), [], cx).await;
3193        project
3194            .update(cx, |project, cx| {
3195                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3196            })
3197            .await
3198            .unwrap();
3199
3200        let connection = Rc::new(FakeAgentConnection::new());
3201
3202        let thread = cx
3203            .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3204            .await
3205            .unwrap();
3206
3207        // Whole file
3208        let content = thread
3209            .update(cx, |thread, cx| {
3210                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3211            })
3212            .await
3213            .unwrap();
3214
3215        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3216
3217        // Only start line
3218        let content = thread
3219            .update(cx, |thread, cx| {
3220                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3221            })
3222            .await
3223            .unwrap();
3224
3225        assert_eq!(content, "three\nfour\n");
3226
3227        // Only limit
3228        let content = thread
3229            .update(cx, |thread, cx| {
3230                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3231            })
3232            .await
3233            .unwrap();
3234
3235        assert_eq!(content, "one\ntwo\n");
3236
3237        // Range
3238        let content = thread
3239            .update(cx, |thread, cx| {
3240                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3241            })
3242            .await
3243            .unwrap();
3244
3245        assert_eq!(content, "two\nthree\n");
3246
3247        // Invalid
3248        let err = thread
3249            .update(cx, |thread, cx| {
3250                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3251            })
3252            .await
3253            .unwrap_err();
3254
3255        assert_eq!(
3256            err.to_string(),
3257            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3258        );
3259    }
3260
3261    #[gpui::test]
3262    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3263        init_test(cx);
3264
3265        let fs = FakeFs::new(cx.executor());
3266        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3267        let project = Project::test(fs.clone(), [], cx).await;
3268        project
3269            .update(cx, |project, cx| {
3270                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3271            })
3272            .await
3273            .unwrap();
3274
3275        let connection = Rc::new(FakeAgentConnection::new());
3276
3277        let thread = cx
3278            .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3279            .await
3280            .unwrap();
3281
3282        // Whole file
3283        let content = thread
3284            .update(cx, |thread, cx| {
3285                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3286            })
3287            .await
3288            .unwrap();
3289
3290        assert_eq!(content, "");
3291
3292        // Only start line
3293        let content = thread
3294            .update(cx, |thread, cx| {
3295                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3296            })
3297            .await
3298            .unwrap();
3299
3300        assert_eq!(content, "");
3301
3302        // Only limit
3303        let content = thread
3304            .update(cx, |thread, cx| {
3305                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3306            })
3307            .await
3308            .unwrap();
3309
3310        assert_eq!(content, "");
3311
3312        // Range
3313        let content = thread
3314            .update(cx, |thread, cx| {
3315                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3316            })
3317            .await
3318            .unwrap();
3319
3320        assert_eq!(content, "");
3321
3322        // Invalid
3323        let err = thread
3324            .update(cx, |thread, cx| {
3325                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3326            })
3327            .await
3328            .unwrap_err();
3329
3330        assert_eq!(
3331            err.to_string(),
3332            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3333        );
3334    }
3335    #[gpui::test]
3336    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3337        init_test(cx);
3338
3339        let fs = FakeFs::new(cx.executor());
3340        fs.insert_tree(path!("/tmp"), json!({})).await;
3341        let project = Project::test(fs.clone(), [], cx).await;
3342        project
3343            .update(cx, |project, cx| {
3344                project.find_or_create_worktree(path!("/tmp"), true, cx)
3345            })
3346            .await
3347            .unwrap();
3348
3349        let connection = Rc::new(FakeAgentConnection::new());
3350
3351        let thread = cx
3352            .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3353            .await
3354            .unwrap();
3355
3356        // Out of project file
3357        let err = thread
3358            .update(cx, |thread, cx| {
3359                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3360            })
3361            .await
3362            .unwrap_err();
3363
3364        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3365    }
3366
3367    #[gpui::test]
3368    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3369        init_test(cx);
3370
3371        let fs = FakeFs::new(cx.executor());
3372        let project = Project::test(fs, [], cx).await;
3373        let id = acp::ToolCallId::new("test");
3374
3375        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3376            let id = id.clone();
3377            move |_, thread, mut cx| {
3378                let id = id.clone();
3379                async move {
3380                    thread
3381                        .update(&mut cx, |thread, cx| {
3382                            thread.handle_session_update(
3383                                acp::SessionUpdate::ToolCall(
3384                                    acp::ToolCall::new(id.clone(), "Label")
3385                                        .kind(acp::ToolKind::Fetch)
3386                                        .status(acp::ToolCallStatus::InProgress),
3387                                ),
3388                                cx,
3389                            )
3390                        })
3391                        .unwrap()
3392                        .unwrap();
3393                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3394                }
3395                .boxed_local()
3396            }
3397        }));
3398
3399        let thread = cx
3400            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3401            .await
3402            .unwrap();
3403
3404        let request = thread.update(cx, |thread, cx| {
3405            thread.send_raw("Fetch https://example.com", cx)
3406        });
3407
3408        run_until_first_tool_call(&thread, cx).await;
3409
3410        thread.read_with(cx, |thread, _| {
3411            assert!(matches!(
3412                thread.entries[1],
3413                AgentThreadEntry::ToolCall(ToolCall {
3414                    status: ToolCallStatus::InProgress,
3415                    ..
3416                })
3417            ));
3418        });
3419
3420        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3421
3422        thread.read_with(cx, |thread, _| {
3423            assert!(matches!(
3424                &thread.entries[1],
3425                AgentThreadEntry::ToolCall(ToolCall {
3426                    status: ToolCallStatus::Canceled,
3427                    ..
3428                })
3429            ));
3430        });
3431
3432        thread
3433            .update(cx, |thread, cx| {
3434                thread.handle_session_update(
3435                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3436                        id,
3437                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3438                    )),
3439                    cx,
3440                )
3441            })
3442            .unwrap();
3443
3444        request.await.unwrap();
3445
3446        thread.read_with(cx, |thread, _| {
3447            assert!(matches!(
3448                thread.entries[1],
3449                AgentThreadEntry::ToolCall(ToolCall {
3450                    status: ToolCallStatus::Completed,
3451                    ..
3452                })
3453            ));
3454        });
3455    }
3456
3457    #[gpui::test]
3458    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3459        init_test(cx);
3460        let fs = FakeFs::new(cx.background_executor.clone());
3461        fs.insert_tree(path!("/test"), json!({})).await;
3462        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3463
3464        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3465            move |_, thread, mut cx| {
3466                async move {
3467                    thread
3468                        .update(&mut cx, |thread, cx| {
3469                            thread.handle_session_update(
3470                                acp::SessionUpdate::ToolCall(
3471                                    acp::ToolCall::new("test", "Label")
3472                                        .kind(acp::ToolKind::Edit)
3473                                        .status(acp::ToolCallStatus::Completed)
3474                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3475                                            "/test/test.txt",
3476                                            "foo",
3477                                        ))]),
3478                                ),
3479                                cx,
3480                            )
3481                        })
3482                        .unwrap()
3483                        .unwrap();
3484                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3485                }
3486                .boxed_local()
3487            }
3488        }));
3489
3490        let thread = cx
3491            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3492            .await
3493            .unwrap();
3494
3495        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3496            .await
3497            .unwrap();
3498
3499        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3500    }
3501
3502    #[gpui::test(iterations = 10)]
3503    async fn test_checkpoints(cx: &mut TestAppContext) {
3504        init_test(cx);
3505        let fs = FakeFs::new(cx.background_executor.clone());
3506        fs.insert_tree(
3507            path!("/test"),
3508            json!({
3509                ".git": {}
3510            }),
3511        )
3512        .await;
3513        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3514
3515        let simulate_changes = Arc::new(AtomicBool::new(true));
3516        let next_filename = Arc::new(AtomicUsize::new(0));
3517        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3518            let simulate_changes = simulate_changes.clone();
3519            let next_filename = next_filename.clone();
3520            let fs = fs.clone();
3521            move |request, thread, mut cx| {
3522                let fs = fs.clone();
3523                let simulate_changes = simulate_changes.clone();
3524                let next_filename = next_filename.clone();
3525                async move {
3526                    if simulate_changes.load(SeqCst) {
3527                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3528                        fs.write(Path::new(&filename), b"").await?;
3529                    }
3530
3531                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3532                        panic!("expected text content block");
3533                    };
3534                    thread.update(&mut cx, |thread, cx| {
3535                        thread
3536                            .handle_session_update(
3537                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3538                                    content.text.to_uppercase().into(),
3539                                )),
3540                                cx,
3541                            )
3542                            .unwrap();
3543                    })?;
3544                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3545                }
3546                .boxed_local()
3547            }
3548        }));
3549        let thread = cx
3550            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3551            .await
3552            .unwrap();
3553
3554        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3555            .await
3556            .unwrap();
3557        thread.read_with(cx, |thread, cx| {
3558            assert_eq!(
3559                thread.to_markdown(cx),
3560                indoc! {"
3561                    ## User (checkpoint)
3562
3563                    Lorem
3564
3565                    ## Assistant
3566
3567                    LOREM
3568
3569                "}
3570            );
3571        });
3572        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3573
3574        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3575            .await
3576            .unwrap();
3577        thread.read_with(cx, |thread, cx| {
3578            assert_eq!(
3579                thread.to_markdown(cx),
3580                indoc! {"
3581                    ## User (checkpoint)
3582
3583                    Lorem
3584
3585                    ## Assistant
3586
3587                    LOREM
3588
3589                    ## User (checkpoint)
3590
3591                    ipsum
3592
3593                    ## Assistant
3594
3595                    IPSUM
3596
3597                "}
3598            );
3599        });
3600        assert_eq!(
3601            fs.files(),
3602            vec![
3603                Path::new(path!("/test/file-0")),
3604                Path::new(path!("/test/file-1"))
3605            ]
3606        );
3607
3608        // Checkpoint isn't stored when there are no changes.
3609        simulate_changes.store(false, SeqCst);
3610        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3611            .await
3612            .unwrap();
3613        thread.read_with(cx, |thread, cx| {
3614            assert_eq!(
3615                thread.to_markdown(cx),
3616                indoc! {"
3617                    ## User (checkpoint)
3618
3619                    Lorem
3620
3621                    ## Assistant
3622
3623                    LOREM
3624
3625                    ## User (checkpoint)
3626
3627                    ipsum
3628
3629                    ## Assistant
3630
3631                    IPSUM
3632
3633                    ## User
3634
3635                    dolor
3636
3637                    ## Assistant
3638
3639                    DOLOR
3640
3641                "}
3642            );
3643        });
3644        assert_eq!(
3645            fs.files(),
3646            vec![
3647                Path::new(path!("/test/file-0")),
3648                Path::new(path!("/test/file-1"))
3649            ]
3650        );
3651
3652        // Rewinding the conversation truncates the history and restores the checkpoint.
3653        thread
3654            .update(cx, |thread, cx| {
3655                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3656                    panic!("unexpected entries {:?}", thread.entries)
3657                };
3658                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3659            })
3660            .await
3661            .unwrap();
3662        thread.read_with(cx, |thread, cx| {
3663            assert_eq!(
3664                thread.to_markdown(cx),
3665                indoc! {"
3666                    ## User (checkpoint)
3667
3668                    Lorem
3669
3670                    ## Assistant
3671
3672                    LOREM
3673
3674                "}
3675            );
3676        });
3677        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3678    }
3679
3680    #[gpui::test]
3681    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3682        use std::sync::atomic::AtomicUsize;
3683        init_test(cx);
3684
3685        let fs = FakeFs::new(cx.executor());
3686        let project = Project::test(fs, None, cx).await;
3687
3688        // Create a connection that simulates refusal after tool result
3689        let prompt_count = Arc::new(AtomicUsize::new(0));
3690        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3691            let prompt_count = prompt_count.clone();
3692            move |_request, thread, mut cx| {
3693                let count = prompt_count.fetch_add(1, SeqCst);
3694                async move {
3695                    if count == 0 {
3696                        // First prompt: Generate a tool call with result
3697                        thread.update(&mut cx, |thread, cx| {
3698                            thread
3699                                .handle_session_update(
3700                                    acp::SessionUpdate::ToolCall(
3701                                        acp::ToolCall::new("tool1", "Test Tool")
3702                                            .kind(acp::ToolKind::Fetch)
3703                                            .status(acp::ToolCallStatus::Completed)
3704                                            .raw_input(serde_json::json!({"query": "test"}))
3705                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
3706                                    ),
3707                                    cx,
3708                                )
3709                                .unwrap();
3710                        })?;
3711
3712                        // Now return refusal because of the tool result
3713                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3714                    } else {
3715                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3716                    }
3717                }
3718                .boxed_local()
3719            }
3720        }));
3721
3722        let thread = cx
3723            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3724            .await
3725            .unwrap();
3726
3727        // Track if we see a Refusal event
3728        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3729        let saw_refusal_event_captured = saw_refusal_event.clone();
3730        thread.update(cx, |_thread, cx| {
3731            cx.subscribe(
3732                &thread,
3733                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3734                    if matches!(event, AcpThreadEvent::Refusal) {
3735                        *saw_refusal_event_captured.lock().unwrap() = true;
3736                    }
3737                },
3738            )
3739            .detach();
3740        });
3741
3742        // Send a user message - this will trigger tool call and then refusal
3743        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3744        cx.background_executor.spawn(send_task).detach();
3745        cx.run_until_parked();
3746
3747        // Verify that:
3748        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3749        // 2. The user message was NOT truncated
3750        assert!(
3751            *saw_refusal_event.lock().unwrap(),
3752            "Refusal event should be emitted for tool result refusals"
3753        );
3754
3755        thread.read_with(cx, |thread, _| {
3756            let entries = thread.entries();
3757            assert!(entries.len() >= 2, "Should have user message and tool call");
3758
3759            // Verify user message is still there
3760            assert!(
3761                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3762                "User message should not be truncated"
3763            );
3764
3765            // Verify tool call is there with result
3766            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3767                assert!(
3768                    tool_call.raw_output.is_some(),
3769                    "Tool call should have output"
3770                );
3771            } else {
3772                panic!("Expected tool call at index 1");
3773            }
3774        });
3775    }
3776
3777    #[gpui::test]
3778    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3779        init_test(cx);
3780
3781        let fs = FakeFs::new(cx.executor());
3782        let project = Project::test(fs, None, cx).await;
3783
3784        let refuse_next = Arc::new(AtomicBool::new(false));
3785        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3786            let refuse_next = refuse_next.clone();
3787            move |_request, _thread, _cx| {
3788                if refuse_next.load(SeqCst) {
3789                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3790                        .boxed_local()
3791                } else {
3792                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3793                        .boxed_local()
3794                }
3795            }
3796        }));
3797
3798        let thread = cx
3799            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3800            .await
3801            .unwrap();
3802
3803        // Track if we see a Refusal event
3804        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3805        let saw_refusal_event_captured = saw_refusal_event.clone();
3806        thread.update(cx, |_thread, cx| {
3807            cx.subscribe(
3808                &thread,
3809                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3810                    if matches!(event, AcpThreadEvent::Refusal) {
3811                        *saw_refusal_event_captured.lock().unwrap() = true;
3812                    }
3813                },
3814            )
3815            .detach();
3816        });
3817
3818        // Send a message that will be refused
3819        refuse_next.store(true, SeqCst);
3820        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3821            .await
3822            .unwrap();
3823
3824        // Verify that a Refusal event WAS emitted for user prompt refusal
3825        assert!(
3826            *saw_refusal_event.lock().unwrap(),
3827            "Refusal event should be emitted for user prompt refusals"
3828        );
3829
3830        // Verify the message was truncated (user prompt refusal)
3831        thread.read_with(cx, |thread, cx| {
3832            assert_eq!(thread.to_markdown(cx), "");
3833        });
3834    }
3835
3836    #[gpui::test]
3837    async fn test_refusal(cx: &mut TestAppContext) {
3838        init_test(cx);
3839        let fs = FakeFs::new(cx.background_executor.clone());
3840        fs.insert_tree(path!("/"), json!({})).await;
3841        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3842
3843        let refuse_next = Arc::new(AtomicBool::new(false));
3844        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3845            let refuse_next = refuse_next.clone();
3846            move |request, thread, mut cx| {
3847                let refuse_next = refuse_next.clone();
3848                async move {
3849                    if refuse_next.load(SeqCst) {
3850                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3851                    }
3852
3853                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3854                        panic!("expected text content block");
3855                    };
3856                    thread.update(&mut cx, |thread, cx| {
3857                        thread
3858                            .handle_session_update(
3859                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3860                                    content.text.to_uppercase().into(),
3861                                )),
3862                                cx,
3863                            )
3864                            .unwrap();
3865                    })?;
3866                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3867                }
3868                .boxed_local()
3869            }
3870        }));
3871        let thread = cx
3872            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3873            .await
3874            .unwrap();
3875
3876        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3877            .await
3878            .unwrap();
3879        thread.read_with(cx, |thread, cx| {
3880            assert_eq!(
3881                thread.to_markdown(cx),
3882                indoc! {"
3883                    ## User
3884
3885                    hello
3886
3887                    ## Assistant
3888
3889                    HELLO
3890
3891                "}
3892            );
3893        });
3894
3895        // Simulate refusing the second message. The message should be truncated
3896        // when a user prompt is refused.
3897        refuse_next.store(true, SeqCst);
3898        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3899            .await
3900            .unwrap();
3901        thread.read_with(cx, |thread, cx| {
3902            assert_eq!(
3903                thread.to_markdown(cx),
3904                indoc! {"
3905                    ## User
3906
3907                    hello
3908
3909                    ## Assistant
3910
3911                    HELLO
3912
3913                "}
3914            );
3915        });
3916    }
3917
3918    async fn run_until_first_tool_call(
3919        thread: &Entity<AcpThread>,
3920        cx: &mut TestAppContext,
3921    ) -> usize {
3922        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3923
3924        let subscription = cx.update(|cx| {
3925            cx.subscribe(thread, move |thread, _, cx| {
3926                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3927                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3928                        return tx.try_send(ix).unwrap();
3929                    }
3930                }
3931            })
3932        });
3933
3934        select! {
3935            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
3936                panic!("Timeout waiting for tool call")
3937            }
3938            ix = rx.next().fuse() => {
3939                drop(subscription);
3940                ix.unwrap()
3941            }
3942        }
3943    }
3944
3945    #[derive(Clone, Default)]
3946    struct FakeAgentConnection {
3947        auth_methods: Vec<acp::AuthMethod>,
3948        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3949        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
3950        on_user_message: Option<
3951            Rc<
3952                dyn Fn(
3953                        acp::PromptRequest,
3954                        WeakEntity<AcpThread>,
3955                        AsyncApp,
3956                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3957                    + 'static,
3958            >,
3959        >,
3960    }
3961
3962    impl FakeAgentConnection {
3963        fn new() -> Self {
3964            Self {
3965                auth_methods: Vec::new(),
3966                on_user_message: None,
3967                sessions: Arc::default(),
3968                set_title_calls: Default::default(),
3969            }
3970        }
3971
3972        #[expect(unused)]
3973        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3974            self.auth_methods = auth_methods;
3975            self
3976        }
3977
3978        fn on_user_message(
3979            mut self,
3980            handler: impl Fn(
3981                acp::PromptRequest,
3982                WeakEntity<AcpThread>,
3983                AsyncApp,
3984            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3985            + 'static,
3986        ) -> Self {
3987            self.on_user_message.replace(Rc::new(handler));
3988            self
3989        }
3990    }
3991
3992    impl AgentConnection for FakeAgentConnection {
3993        fn telemetry_id(&self) -> SharedString {
3994            "fake".into()
3995        }
3996
3997        fn auth_methods(&self) -> &[acp::AuthMethod] {
3998            &self.auth_methods
3999        }
4000
4001        fn new_session(
4002            self: Rc<Self>,
4003            project: Entity<Project>,
4004            cwd: &Path,
4005            cx: &mut App,
4006        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4007            let session_id = acp::SessionId::new(
4008                rand::rng()
4009                    .sample_iter(&distr::Alphanumeric)
4010                    .take(7)
4011                    .map(char::from)
4012                    .collect::<String>(),
4013            );
4014            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4015            let thread = cx.new(|cx| {
4016                AcpThread::new(
4017                    None,
4018                    "Test",
4019                    Some(cwd.to_path_buf()),
4020                    self.clone(),
4021                    project,
4022                    action_log,
4023                    session_id.clone(),
4024                    watch::Receiver::constant(
4025                        acp::PromptCapabilities::new()
4026                            .image(true)
4027                            .audio(true)
4028                            .embedded_context(true),
4029                    ),
4030                    cx,
4031                )
4032            });
4033            self.sessions.lock().insert(session_id, thread.downgrade());
4034            Task::ready(Ok(thread))
4035        }
4036
4037        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4038            if self.auth_methods().iter().any(|m| m.id == method) {
4039                Task::ready(Ok(()))
4040            } else {
4041                Task::ready(Err(anyhow!("Invalid Auth Method")))
4042            }
4043        }
4044
4045        fn prompt(
4046            &self,
4047            _id: Option<UserMessageId>,
4048            params: acp::PromptRequest,
4049            cx: &mut App,
4050        ) -> Task<gpui::Result<acp::PromptResponse>> {
4051            let sessions = self.sessions.lock();
4052            let thread = sessions.get(&params.session_id).unwrap();
4053            if let Some(handler) = &self.on_user_message {
4054                let handler = handler.clone();
4055                let thread = thread.clone();
4056                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4057            } else {
4058                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4059            }
4060        }
4061
4062        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4063
4064        fn truncate(
4065            &self,
4066            session_id: &acp::SessionId,
4067            _cx: &App,
4068        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4069            Some(Rc::new(FakeAgentSessionEditor {
4070                _session_id: session_id.clone(),
4071            }))
4072        }
4073
4074        fn set_title(
4075            &self,
4076            _session_id: &acp::SessionId,
4077            _cx: &App,
4078        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4079            Some(Rc::new(FakeAgentSessionSetTitle {
4080                calls: self.set_title_calls.clone(),
4081            }))
4082        }
4083
4084        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4085            self
4086        }
4087    }
4088
4089    struct FakeAgentSessionSetTitle {
4090        calls: Rc<RefCell<Vec<SharedString>>>,
4091    }
4092
4093    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4094        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4095            self.calls.borrow_mut().push(title);
4096            Task::ready(Ok(()))
4097        }
4098    }
4099
4100    struct FakeAgentSessionEditor {
4101        _session_id: acp::SessionId,
4102    }
4103
4104    impl AgentSessionTruncate for FakeAgentSessionEditor {
4105        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4106            Task::ready(Ok(()))
4107        }
4108    }
4109
4110    #[gpui::test]
4111    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4112        init_test(cx);
4113
4114        let fs = FakeFs::new(cx.executor());
4115        let project = Project::test(fs, [], cx).await;
4116        let connection = Rc::new(FakeAgentConnection::new());
4117        let thread = cx
4118            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4119            .await
4120            .unwrap();
4121
4122        // Try to update a tool call that doesn't exist
4123        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4124        thread.update(cx, |thread, cx| {
4125            let result = thread.handle_session_update(
4126                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4127                    nonexistent_id.clone(),
4128                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4129                )),
4130                cx,
4131            );
4132
4133            // The update should succeed (not return an error)
4134            assert!(result.is_ok());
4135
4136            // There should now be exactly one entry in the thread
4137            assert_eq!(thread.entries.len(), 1);
4138
4139            // The entry should be a failed tool call
4140            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4141                assert_eq!(tool_call.id, nonexistent_id);
4142                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4143                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4144
4145                // Check that the content contains the error message
4146                assert_eq!(tool_call.content.len(), 1);
4147                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4148                    match content_block {
4149                        ContentBlock::Markdown { markdown } => {
4150                            let markdown_text = markdown.read(cx).source();
4151                            assert!(markdown_text.contains("Tool call not found"));
4152                        }
4153                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4154                        ContentBlock::ResourceLink { .. } => {
4155                            panic!("Expected markdown content, got resource link")
4156                        }
4157                        ContentBlock::Image { .. } => {
4158                            panic!("Expected markdown content, got image")
4159                        }
4160                    }
4161                } else {
4162                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4163                }
4164            } else {
4165                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4166            }
4167        });
4168    }
4169
4170    /// Tests that restoring a checkpoint properly cleans up terminals that were
4171    /// created after that checkpoint, and cancels any in-progress generation.
4172    ///
4173    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4174    /// that were started after that checkpoint should be terminated, and any in-progress
4175    /// AI generation should be canceled.
4176    #[gpui::test]
4177    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4178        init_test(cx);
4179
4180        let fs = FakeFs::new(cx.executor());
4181        let project = Project::test(fs, [], cx).await;
4182        let connection = Rc::new(FakeAgentConnection::new());
4183        let thread = cx
4184            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4185            .await
4186            .unwrap();
4187
4188        // Send first user message to create a checkpoint
4189        cx.update(|cx| {
4190            thread.update(cx, |thread, cx| {
4191                thread.send(vec!["first message".into()], cx)
4192            })
4193        })
4194        .await
4195        .unwrap();
4196
4197        // Send second message (creates another checkpoint) - we'll restore to this one
4198        cx.update(|cx| {
4199            thread.update(cx, |thread, cx| {
4200                thread.send(vec!["second message".into()], cx)
4201            })
4202        })
4203        .await
4204        .unwrap();
4205
4206        // Create 2 terminals BEFORE the checkpoint that have completed running
4207        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4208        let mock_terminal_1 = cx.new(|cx| {
4209            let builder = ::terminal::TerminalBuilder::new_display_only(
4210                ::terminal::terminal_settings::CursorShape::default(),
4211                ::terminal::terminal_settings::AlternateScroll::On,
4212                None,
4213                0,
4214                cx.background_executor(),
4215                PathStyle::local(),
4216            )
4217            .unwrap();
4218            builder.subscribe(cx)
4219        });
4220
4221        thread.update(cx, |thread, cx| {
4222            thread.on_terminal_provider_event(
4223                TerminalProviderEvent::Created {
4224                    terminal_id: terminal_id_1.clone(),
4225                    label: "echo 'first'".to_string(),
4226                    cwd: Some(PathBuf::from("/test")),
4227                    output_byte_limit: None,
4228                    terminal: mock_terminal_1.clone(),
4229                },
4230                cx,
4231            );
4232        });
4233
4234        thread.update(cx, |thread, cx| {
4235            thread.on_terminal_provider_event(
4236                TerminalProviderEvent::Output {
4237                    terminal_id: terminal_id_1.clone(),
4238                    data: b"first\n".to_vec(),
4239                },
4240                cx,
4241            );
4242        });
4243
4244        thread.update(cx, |thread, cx| {
4245            thread.on_terminal_provider_event(
4246                TerminalProviderEvent::Exit {
4247                    terminal_id: terminal_id_1.clone(),
4248                    status: acp::TerminalExitStatus::new().exit_code(0),
4249                },
4250                cx,
4251            );
4252        });
4253
4254        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4255        let mock_terminal_2 = cx.new(|cx| {
4256            let builder = ::terminal::TerminalBuilder::new_display_only(
4257                ::terminal::terminal_settings::CursorShape::default(),
4258                ::terminal::terminal_settings::AlternateScroll::On,
4259                None,
4260                0,
4261                cx.background_executor(),
4262                PathStyle::local(),
4263            )
4264            .unwrap();
4265            builder.subscribe(cx)
4266        });
4267
4268        thread.update(cx, |thread, cx| {
4269            thread.on_terminal_provider_event(
4270                TerminalProviderEvent::Created {
4271                    terminal_id: terminal_id_2.clone(),
4272                    label: "echo 'second'".to_string(),
4273                    cwd: Some(PathBuf::from("/test")),
4274                    output_byte_limit: None,
4275                    terminal: mock_terminal_2.clone(),
4276                },
4277                cx,
4278            );
4279        });
4280
4281        thread.update(cx, |thread, cx| {
4282            thread.on_terminal_provider_event(
4283                TerminalProviderEvent::Output {
4284                    terminal_id: terminal_id_2.clone(),
4285                    data: b"second\n".to_vec(),
4286                },
4287                cx,
4288            );
4289        });
4290
4291        thread.update(cx, |thread, cx| {
4292            thread.on_terminal_provider_event(
4293                TerminalProviderEvent::Exit {
4294                    terminal_id: terminal_id_2.clone(),
4295                    status: acp::TerminalExitStatus::new().exit_code(0),
4296                },
4297                cx,
4298            );
4299        });
4300
4301        // Get the second message ID to restore to
4302        let second_message_id = thread.read_with(cx, |thread, _| {
4303            // At this point we have:
4304            // - Index 0: First user message (with checkpoint)
4305            // - Index 1: Second user message (with checkpoint)
4306            // No assistant responses because FakeAgentConnection just returns EndTurn
4307            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4308                panic!("expected user message at index 1");
4309            };
4310            message.id.clone().unwrap()
4311        });
4312
4313        // Create a terminal AFTER the checkpoint we'll restore to.
4314        // This simulates the AI agent starting a long-running terminal command.
4315        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4316        let mock_terminal = cx.new(|cx| {
4317            let builder = ::terminal::TerminalBuilder::new_display_only(
4318                ::terminal::terminal_settings::CursorShape::default(),
4319                ::terminal::terminal_settings::AlternateScroll::On,
4320                None,
4321                0,
4322                cx.background_executor(),
4323                PathStyle::local(),
4324            )
4325            .unwrap();
4326            builder.subscribe(cx)
4327        });
4328
4329        // Register the terminal as created
4330        thread.update(cx, |thread, cx| {
4331            thread.on_terminal_provider_event(
4332                TerminalProviderEvent::Created {
4333                    terminal_id: terminal_id.clone(),
4334                    label: "sleep 1000".to_string(),
4335                    cwd: Some(PathBuf::from("/test")),
4336                    output_byte_limit: None,
4337                    terminal: mock_terminal.clone(),
4338                },
4339                cx,
4340            );
4341        });
4342
4343        // Simulate the terminal producing output (still running)
4344        thread.update(cx, |thread, cx| {
4345            thread.on_terminal_provider_event(
4346                TerminalProviderEvent::Output {
4347                    terminal_id: terminal_id.clone(),
4348                    data: b"terminal is running...\n".to_vec(),
4349                },
4350                cx,
4351            );
4352        });
4353
4354        // Create a tool call entry that references this terminal
4355        // This represents the agent requesting a terminal command
4356        thread.update(cx, |thread, cx| {
4357            thread
4358                .handle_session_update(
4359                    acp::SessionUpdate::ToolCall(
4360                        acp::ToolCall::new("terminal-tool-1", "Running command")
4361                            .kind(acp::ToolKind::Execute)
4362                            .status(acp::ToolCallStatus::InProgress)
4363                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4364                                terminal_id.clone(),
4365                            ))])
4366                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4367                    ),
4368                    cx,
4369                )
4370                .unwrap();
4371        });
4372
4373        // Verify terminal exists and is in the thread
4374        let terminal_exists_before =
4375            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4376        assert!(
4377            terminal_exists_before,
4378            "Terminal should exist before checkpoint restore"
4379        );
4380
4381        // Verify the terminal's underlying task is still running (not completed)
4382        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4383            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4384            terminal_entity.read_with(cx, |term, _cx| {
4385                term.output().is_none() // output is None means it's still running
4386            })
4387        });
4388        assert!(
4389            terminal_running_before,
4390            "Terminal should be running before checkpoint restore"
4391        );
4392
4393        // Verify we have the expected entries before restore
4394        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4395        assert!(
4396            entry_count_before > 1,
4397            "Should have multiple entries before restore"
4398        );
4399
4400        // Restore the checkpoint to the second message.
4401        // This should:
4402        // 1. Cancel any in-progress generation (via the cancel() call)
4403        // 2. Remove the terminal that was created after that point
4404        thread
4405            .update(cx, |thread, cx| {
4406                thread.restore_checkpoint(second_message_id, cx)
4407            })
4408            .await
4409            .unwrap();
4410
4411        // Verify that no send_task is in progress after restore
4412        // (cancel() clears the send_task)
4413        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4414        assert!(
4415            !has_send_task_after,
4416            "Should not have a send_task after restore (cancel should have cleared it)"
4417        );
4418
4419        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4420        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4421        assert_eq!(
4422            entry_count, 1,
4423            "Should have 1 entry after restore (only the first user message)"
4424        );
4425
4426        // Verify the 2 completed terminals from before the checkpoint still exist
4427        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4428            thread.terminals.contains_key(&terminal_id_1)
4429        });
4430        assert!(
4431            terminal_1_exists,
4432            "Terminal 1 (from before checkpoint) should still exist"
4433        );
4434
4435        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4436            thread.terminals.contains_key(&terminal_id_2)
4437        });
4438        assert!(
4439            terminal_2_exists,
4440            "Terminal 2 (from before checkpoint) should still exist"
4441        );
4442
4443        // Verify they're still in completed state
4444        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4445            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4446            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4447        });
4448        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4449
4450        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4451            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4452            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4453        });
4454        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4455
4456        // Verify the running terminal (created after checkpoint) was removed
4457        let terminal_3_exists =
4458            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4459        assert!(
4460            !terminal_3_exists,
4461            "Terminal 3 (created after checkpoint) should have been removed"
4462        );
4463
4464        // Verify total count is 2 (the two from before the checkpoint)
4465        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4466        assert_eq!(
4467            terminal_count, 2,
4468            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4469        );
4470    }
4471
4472    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4473    /// even when a new user message is added while the async checkpoint comparison is in progress.
4474    ///
4475    /// This is a regression test for a bug where update_last_checkpoint would fail with
4476    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4477    /// update_last_checkpoint started and when its async closure ran.
4478    #[gpui::test]
4479    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4480        init_test(cx);
4481
4482        let fs = FakeFs::new(cx.executor());
4483        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4484            .await;
4485        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4486
4487        let handler_done = Arc::new(AtomicBool::new(false));
4488        let handler_done_clone = handler_done.clone();
4489        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4490            move |_, _thread, _cx| {
4491                handler_done_clone.store(true, SeqCst);
4492                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4493            },
4494        ));
4495
4496        let thread = cx
4497            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4498            .await
4499            .unwrap();
4500
4501        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4502        let send_task = cx.background_executor.spawn(send_future);
4503
4504        // Tick until handler completes, then a few more to let update_last_checkpoint start
4505        while !handler_done.load(SeqCst) {
4506            cx.executor().tick();
4507        }
4508        for _ in 0..5 {
4509            cx.executor().tick();
4510        }
4511
4512        thread.update(cx, |thread, cx| {
4513            thread.push_entry(
4514                AgentThreadEntry::UserMessage(UserMessage {
4515                    id: Some(UserMessageId::new()),
4516                    content: ContentBlock::Empty,
4517                    chunks: vec!["Injected message (no checkpoint)".into()],
4518                    checkpoint: None,
4519                    indented: false,
4520                }),
4521                cx,
4522            );
4523        });
4524
4525        cx.run_until_parked();
4526        let result = send_task.await;
4527
4528        assert!(
4529            result.is_ok(),
4530            "send should succeed even when new message added during update_last_checkpoint: {:?}",
4531            result.err()
4532        );
4533    }
4534
4535    /// Tests that when a follow-up message is sent during generation,
4536    /// the first turn completing does NOT clear `running_turn` because
4537    /// it now belongs to the second turn.
4538    #[gpui::test]
4539    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4540        init_test(cx);
4541
4542        let fs = FakeFs::new(cx.executor());
4543        let project = Project::test(fs, [], cx).await;
4544
4545        // First handler waits for this signal before completing
4546        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4547        let first_complete_rx = RefCell::new(Some(first_complete_rx));
4548
4549        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4550            move |params, _thread, _cx| {
4551                let first_complete_rx = first_complete_rx.borrow_mut().take();
4552                let is_first = params
4553                    .prompt
4554                    .iter()
4555                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4556
4557                async move {
4558                    if is_first {
4559                        // First handler waits until signaled
4560                        if let Some(rx) = first_complete_rx {
4561                            rx.await.ok();
4562                        }
4563                    }
4564                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4565                }
4566                .boxed_local()
4567            }
4568        }));
4569
4570        let thread = cx
4571            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4572            .await
4573            .unwrap();
4574
4575        // Send first message (turn_id=1) - handler will block
4576        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4577        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4578
4579        // Send second message (turn_id=2) while first is still blocked
4580        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4581        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4582        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4583
4584        let running_turn_after_second_send =
4585            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4586        assert_eq!(
4587            running_turn_after_second_send,
4588            Some(2),
4589            "running_turn should be set to turn 2 after sending second message"
4590        );
4591
4592        // Now signal first handler to complete
4593        first_complete_tx.send(()).ok();
4594
4595        // First request completes - should NOT clear running_turn
4596        // because running_turn now belongs to turn 2
4597        first_request.await.unwrap();
4598
4599        let running_turn_after_first =
4600            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4601        assert_eq!(
4602            running_turn_after_first,
4603            Some(2),
4604            "first turn completing should not clear running_turn (belongs to turn 2)"
4605        );
4606
4607        // Second request completes - SHOULD clear running_turn
4608        second_request.await.unwrap();
4609
4610        let running_turn_after_second =
4611            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4612        assert!(
4613            !running_turn_after_second,
4614            "second turn completing should clear running_turn"
4615        );
4616    }
4617
4618    #[gpui::test]
4619    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4620        cx: &mut TestAppContext,
4621    ) {
4622        init_test(cx);
4623
4624        let fs = FakeFs::new(cx.executor());
4625        let project = Project::test(fs, [], cx).await;
4626
4627        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4628            move |_params, thread, mut cx| {
4629                async move {
4630                    thread
4631                        .update(&mut cx, |thread, cx| {
4632                            thread.handle_session_update(
4633                                acp::SessionUpdate::ToolCall(
4634                                    acp::ToolCall::new(
4635                                        acp::ToolCallId::new("test-tool"),
4636                                        "Test Tool",
4637                                    )
4638                                    .kind(acp::ToolKind::Fetch)
4639                                    .status(acp::ToolCallStatus::InProgress),
4640                                ),
4641                                cx,
4642                            )
4643                        })
4644                        .unwrap()
4645                        .unwrap();
4646
4647                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
4648                }
4649                .boxed_local()
4650            },
4651        ));
4652
4653        let thread = cx
4654            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4655            .await
4656            .unwrap();
4657
4658        let response = thread
4659            .update(cx, |thread, cx| thread.send_raw("test message", cx))
4660            .await;
4661
4662        let response = response
4663            .expect("send should succeed")
4664            .expect("should have response");
4665        assert_eq!(
4666            response.stop_reason,
4667            acp::StopReason::Cancelled,
4668            "response should have Cancelled stop_reason"
4669        );
4670
4671        thread.read_with(cx, |thread, _| {
4672            let tool_entry = thread
4673                .entries
4674                .iter()
4675                .find_map(|e| {
4676                    if let AgentThreadEntry::ToolCall(call) = e {
4677                        Some(call)
4678                    } else {
4679                        None
4680                    }
4681                })
4682                .expect("should have tool call entry");
4683
4684            assert!(
4685                matches!(tool_entry.status, ToolCallStatus::Canceled),
4686                "tool should be marked as Canceled when response is Cancelled, got {:?}",
4687                tool_entry.status
4688            );
4689        });
4690    }
4691
4692    #[gpui::test]
4693    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
4694        init_test(cx);
4695
4696        let fs = FakeFs::new(cx.executor());
4697        let project = Project::test(fs, [], cx).await;
4698        let connection = Rc::new(FakeAgentConnection::new());
4699        let set_title_calls = connection.set_title_calls.clone();
4700
4701        let thread = cx
4702            .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4703            .await
4704            .unwrap();
4705
4706        // Initial title is the default.
4707        thread.read_with(cx, |thread, _| {
4708            assert_eq!(thread.title().as_ref(), "Test");
4709        });
4710
4711        // Setting a provisional title updates the display title.
4712        thread.update(cx, |thread, cx| {
4713            thread.set_provisional_title("Hello, can you help…".into(), cx);
4714        });
4715        thread.read_with(cx, |thread, _| {
4716            assert_eq!(thread.title().as_ref(), "Hello, can you help…");
4717        });
4718
4719        // The provisional title should NOT have propagated to the connection.
4720        assert_eq!(
4721            set_title_calls.borrow().len(),
4722            0,
4723            "provisional title should not propagate to the connection"
4724        );
4725
4726        // When the real title arrives via set_title, it replaces the
4727        // provisional title and propagates to the connection.
4728        let task = thread.update(cx, |thread, cx| {
4729            thread.set_title("Helping with Rust question".into(), cx)
4730        });
4731        task.await.expect("set_title should succeed");
4732        thread.read_with(cx, |thread, _| {
4733            assert_eq!(thread.title().as_ref(), "Helping with Rust question");
4734        });
4735        assert_eq!(
4736            set_title_calls.borrow().as_slice(),
4737            &[SharedString::from("Helping with Rust question")],
4738            "real title should propagate to the connection"
4739        );
4740    }
4741}