acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6pub use connection::*;
   7pub use diff::*;
   8pub use mention::*;
   9use serde::{Deserialize, Serialize};
  10pub use terminal::*;
  11
  12use action_log::ActionLog;
  13use agent_client_protocol as acp;
  14use anyhow::{Context as _, Result, anyhow};
  15use chrono::{DateTime, Utc};
  16use editor::Bias;
  17use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  18use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  19use itertools::Itertools;
  20use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  21use markdown::Markdown;
  22use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  23use std::collections::HashMap;
  24use std::error::Error;
  25use std::fmt::{Formatter, Write};
  26use std::ops::Range;
  27use std::process::ExitStatus;
  28use std::rc::Rc;
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use ui::App;
  31use util::ResultExt;
  32
  33#[derive(Debug)]
  34pub struct UserMessage {
  35    pub id: Option<UserMessageId>,
  36    pub content: ContentBlock,
  37    pub chunks: Vec<acp::ContentBlock>,
  38    pub checkpoint: Option<Checkpoint>,
  39}
  40
  41#[derive(Debug)]
  42pub struct Checkpoint {
  43    git_checkpoint: GitStoreCheckpoint,
  44    pub show: bool,
  45}
  46
  47impl UserMessage {
  48    fn to_markdown(&self, cx: &App) -> String {
  49        let mut markdown = String::new();
  50        if self
  51            .checkpoint
  52            .as_ref()
  53            .map_or(false, |checkpoint| checkpoint.show)
  54        {
  55            writeln!(markdown, "## User (checkpoint)").unwrap();
  56        } else {
  57            writeln!(markdown, "## User").unwrap();
  58        }
  59        writeln!(markdown).unwrap();
  60        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  61        writeln!(markdown).unwrap();
  62        markdown
  63    }
  64}
  65
  66#[derive(Debug, PartialEq)]
  67pub struct AssistantMessage {
  68    pub chunks: Vec<AssistantMessageChunk>,
  69}
  70
  71impl AssistantMessage {
  72    pub fn to_markdown(&self, cx: &App) -> String {
  73        format!(
  74            "## Assistant\n\n{}\n\n",
  75            self.chunks
  76                .iter()
  77                .map(|chunk| chunk.to_markdown(cx))
  78                .join("\n\n")
  79        )
  80    }
  81}
  82
  83#[derive(Debug, PartialEq)]
  84pub enum AssistantMessageChunk {
  85    Message { block: ContentBlock },
  86    Thought { block: ContentBlock },
  87}
  88
  89impl AssistantMessageChunk {
  90    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  91        Self::Message {
  92            block: ContentBlock::new(chunk.into(), language_registry, cx),
  93        }
  94    }
  95
  96    fn to_markdown(&self, cx: &App) -> String {
  97        match self {
  98            Self::Message { block } => block.to_markdown(cx).to_string(),
  99            Self::Thought { block } => {
 100                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 101            }
 102        }
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub enum AgentThreadEntry {
 108    UserMessage(UserMessage),
 109    AssistantMessage(AssistantMessage),
 110    ToolCall(ToolCall),
 111}
 112
 113impl AgentThreadEntry {
 114    fn to_markdown(&self, cx: &App) -> String {
 115        match self {
 116            Self::UserMessage(message) => message.to_markdown(cx),
 117            Self::AssistantMessage(message) => message.to_markdown(cx),
 118            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 119        }
 120    }
 121
 122    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 123        if let AgentThreadEntry::ToolCall(call) = self {
 124            itertools::Either::Left(call.diffs())
 125        } else {
 126            itertools::Either::Right(std::iter::empty())
 127        }
 128    }
 129
 130    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 131        if let AgentThreadEntry::ToolCall(call) = self {
 132            itertools::Either::Left(call.terminals())
 133        } else {
 134            itertools::Either::Right(std::iter::empty())
 135        }
 136    }
 137
 138    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 139        if let AgentThreadEntry::ToolCall(ToolCall {
 140            locations,
 141            resolved_locations,
 142            ..
 143        }) = self
 144        {
 145            Some((
 146                locations.get(ix)?.clone(),
 147                resolved_locations.get(ix)?.clone()?,
 148            ))
 149        } else {
 150            None
 151        }
 152    }
 153}
 154
 155#[derive(Debug)]
 156pub struct ToolCall {
 157    pub id: acp::ToolCallId,
 158    pub label: Entity<Markdown>,
 159    pub kind: acp::ToolKind,
 160    pub content: Vec<ToolCallContent>,
 161    pub status: ToolCallStatus,
 162    pub locations: Vec<acp::ToolCallLocation>,
 163    pub resolved_locations: Vec<Option<AgentLocation>>,
 164    pub raw_input: Option<serde_json::Value>,
 165    pub raw_output: Option<serde_json::Value>,
 166}
 167
 168impl ToolCall {
 169    fn from_acp(
 170        tool_call: acp::ToolCall,
 171        status: ToolCallStatus,
 172        language_registry: Arc<LanguageRegistry>,
 173        cx: &mut App,
 174    ) -> Self {
 175        Self {
 176            id: tool_call.id,
 177            label: cx.new(|cx| {
 178                Markdown::new(
 179                    tool_call.title.into(),
 180                    Some(language_registry.clone()),
 181                    None,
 182                    cx,
 183                )
 184            }),
 185            kind: tool_call.kind,
 186            content: tool_call
 187                .content
 188                .into_iter()
 189                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 190                .collect(),
 191            locations: tool_call.locations,
 192            resolved_locations: Vec::default(),
 193            status,
 194            raw_input: tool_call.raw_input,
 195            raw_output: tool_call.raw_output,
 196        }
 197    }
 198
 199    fn update_fields(
 200        &mut self,
 201        fields: acp::ToolCallUpdateFields,
 202        language_registry: Arc<LanguageRegistry>,
 203        cx: &mut App,
 204    ) {
 205        let acp::ToolCallUpdateFields {
 206            kind,
 207            status,
 208            title,
 209            content,
 210            locations,
 211            raw_input,
 212            raw_output,
 213        } = fields;
 214
 215        if let Some(kind) = kind {
 216            self.kind = kind;
 217        }
 218
 219        if let Some(status) = status {
 220            self.status = ToolCallStatus::Allowed { status };
 221        }
 222
 223        if let Some(title) = title {
 224            self.label.update(cx, |label, cx| {
 225                label.replace(title, cx);
 226            });
 227        }
 228
 229        if let Some(content) = content {
 230            self.content = content
 231                .into_iter()
 232                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 233                .collect();
 234        }
 235
 236        if let Some(locations) = locations {
 237            self.locations = locations;
 238        }
 239
 240        if let Some(raw_input) = raw_input {
 241            self.raw_input = Some(raw_input);
 242        }
 243
 244        if let Some(raw_output) = raw_output {
 245            if self.content.is_empty() {
 246                if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 247                {
 248                    self.content
 249                        .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 250                            markdown,
 251                        }));
 252                }
 253            }
 254            self.raw_output = Some(raw_output);
 255        }
 256    }
 257
 258    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 259        self.content.iter().filter_map(|content| match content {
 260            ToolCallContent::Diff(diff) => Some(diff),
 261            ToolCallContent::ContentBlock(_) => None,
 262            ToolCallContent::Terminal(_) => None,
 263        })
 264    }
 265
 266    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 267        self.content.iter().filter_map(|content| match content {
 268            ToolCallContent::Terminal(terminal) => Some(terminal),
 269            ToolCallContent::ContentBlock(_) => None,
 270            ToolCallContent::Diff(_) => None,
 271        })
 272    }
 273
 274    fn to_markdown(&self, cx: &App) -> String {
 275        let mut markdown = format!(
 276            "**Tool Call: {}**\nStatus: {}\n\n",
 277            self.label.read(cx).source(),
 278            self.status
 279        );
 280        for content in &self.content {
 281            markdown.push_str(content.to_markdown(cx).as_str());
 282            markdown.push_str("\n\n");
 283        }
 284        markdown
 285    }
 286
 287    async fn resolve_location(
 288        location: acp::ToolCallLocation,
 289        project: WeakEntity<Project>,
 290        cx: &mut AsyncApp,
 291    ) -> Option<AgentLocation> {
 292        let buffer = project
 293            .update(cx, |project, cx| {
 294                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 295                    Some(project.open_buffer(path, cx))
 296                } else {
 297                    None
 298                }
 299            })
 300            .ok()??;
 301        let buffer = buffer.await.log_err()?;
 302        let position = buffer
 303            .update(cx, |buffer, _| {
 304                if let Some(row) = location.line {
 305                    let snapshot = buffer.snapshot();
 306                    let column = snapshot.indent_size_for_line(row).len;
 307                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 308                    snapshot.anchor_before(point)
 309                } else {
 310                    Anchor::MIN
 311                }
 312            })
 313            .ok()?;
 314
 315        Some(AgentLocation {
 316            buffer: buffer.downgrade(),
 317            position,
 318        })
 319    }
 320
 321    fn resolve_locations(
 322        &self,
 323        project: Entity<Project>,
 324        cx: &mut App,
 325    ) -> Task<Vec<Option<AgentLocation>>> {
 326        let locations = self.locations.clone();
 327        project.update(cx, |_, cx| {
 328            cx.spawn(async move |project, cx| {
 329                let mut new_locations = Vec::new();
 330                for location in locations {
 331                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 332                }
 333                new_locations
 334            })
 335        })
 336    }
 337}
 338
 339#[derive(Debug)]
 340pub enum ToolCallStatus {
 341    WaitingForConfirmation {
 342        options: Vec<acp::PermissionOption>,
 343        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 344    },
 345    Allowed {
 346        status: acp::ToolCallStatus,
 347    },
 348    Rejected,
 349    Canceled,
 350}
 351
 352impl Display for ToolCallStatus {
 353    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 354        write!(
 355            f,
 356            "{}",
 357            match self {
 358                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 359                ToolCallStatus::Allowed { status } => match status {
 360                    acp::ToolCallStatus::Pending => "Pending",
 361                    acp::ToolCallStatus::InProgress => "In Progress",
 362                    acp::ToolCallStatus::Completed => "Completed",
 363                    acp::ToolCallStatus::Failed => "Failed",
 364                },
 365                ToolCallStatus::Rejected => "Rejected",
 366                ToolCallStatus::Canceled => "Canceled",
 367            }
 368        )
 369    }
 370}
 371
 372#[derive(Debug, PartialEq, Clone)]
 373pub enum ContentBlock {
 374    Empty,
 375    Markdown { markdown: Entity<Markdown> },
 376    ResourceLink { resource_link: acp::ResourceLink },
 377}
 378
 379impl ContentBlock {
 380    pub fn new(
 381        block: acp::ContentBlock,
 382        language_registry: &Arc<LanguageRegistry>,
 383        cx: &mut App,
 384    ) -> Self {
 385        let mut this = Self::Empty;
 386        this.append(block, language_registry, cx);
 387        this
 388    }
 389
 390    pub fn new_combined(
 391        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 392        language_registry: Arc<LanguageRegistry>,
 393        cx: &mut App,
 394    ) -> Self {
 395        let mut this = Self::Empty;
 396        for block in blocks {
 397            this.append(block, &language_registry, cx);
 398        }
 399        this
 400    }
 401
 402    pub fn append(
 403        &mut self,
 404        block: acp::ContentBlock,
 405        language_registry: &Arc<LanguageRegistry>,
 406        cx: &mut App,
 407    ) {
 408        if matches!(self, ContentBlock::Empty) {
 409            if let acp::ContentBlock::ResourceLink(resource_link) = block {
 410                *self = ContentBlock::ResourceLink { resource_link };
 411                return;
 412            }
 413        }
 414
 415        let new_content = self.block_string_contents(block);
 416
 417        match self {
 418            ContentBlock::Empty => {
 419                *self = Self::create_markdown_block(new_content, language_registry, cx);
 420            }
 421            ContentBlock::Markdown { markdown } => {
 422                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 423            }
 424            ContentBlock::ResourceLink { resource_link } => {
 425                let existing_content = Self::resource_link_md(&resource_link.uri);
 426                let combined = format!("{}\n{}", existing_content, new_content);
 427
 428                *self = Self::create_markdown_block(combined, language_registry, cx);
 429            }
 430        }
 431    }
 432
 433    fn create_markdown_block(
 434        content: String,
 435        language_registry: &Arc<LanguageRegistry>,
 436        cx: &mut App,
 437    ) -> ContentBlock {
 438        ContentBlock::Markdown {
 439            markdown: cx
 440                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 441        }
 442    }
 443
 444    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 445        match block {
 446            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 447            acp::ContentBlock::ResourceLink(resource_link) => {
 448                Self::resource_link_md(&resource_link.uri)
 449            }
 450            acp::ContentBlock::Resource(acp::EmbeddedResource {
 451                resource:
 452                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 453                        uri,
 454                        ..
 455                    }),
 456                ..
 457            }) => Self::resource_link_md(&uri),
 458            acp::ContentBlock::Image(image) => Self::image_md(&image),
 459            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 460        }
 461    }
 462
 463    fn resource_link_md(uri: &str) -> String {
 464        if let Some(uri) = MentionUri::parse(&uri).log_err() {
 465            uri.as_link().to_string()
 466        } else {
 467            uri.to_string()
 468        }
 469    }
 470
 471    fn image_md(_image: &acp::ImageContent) -> String {
 472        "`Image`".into()
 473    }
 474
 475    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 476        match self {
 477            ContentBlock::Empty => "",
 478            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 479            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 480        }
 481    }
 482
 483    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 484        match self {
 485            ContentBlock::Empty => None,
 486            ContentBlock::Markdown { markdown } => Some(markdown),
 487            ContentBlock::ResourceLink { .. } => None,
 488        }
 489    }
 490
 491    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 492        match self {
 493            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 494            _ => None,
 495        }
 496    }
 497}
 498
 499#[derive(Debug)]
 500pub enum ToolCallContent {
 501    ContentBlock(ContentBlock),
 502    Diff(Entity<Diff>),
 503    Terminal(Entity<Terminal>),
 504}
 505
 506impl ToolCallContent {
 507    pub fn from_acp(
 508        content: acp::ToolCallContent,
 509        language_registry: Arc<LanguageRegistry>,
 510        cx: &mut App,
 511    ) -> Self {
 512        match content {
 513            acp::ToolCallContent::Content { content } => {
 514                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 515            }
 516            acp::ToolCallContent::Diff { diff } => {
 517                Self::Diff(cx.new(|cx| Diff::from_acp(diff, language_registry, cx)))
 518            }
 519        }
 520    }
 521
 522    pub fn to_markdown(&self, cx: &App) -> String {
 523        match self {
 524            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 525            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 526            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 527        }
 528    }
 529}
 530
 531#[derive(Debug, PartialEq)]
 532pub enum ToolCallUpdate {
 533    UpdateFields(acp::ToolCallUpdate),
 534    UpdateDiff(ToolCallUpdateDiff),
 535    UpdateTerminal(ToolCallUpdateTerminal),
 536}
 537
 538impl ToolCallUpdate {
 539    fn id(&self) -> &acp::ToolCallId {
 540        match self {
 541            Self::UpdateFields(update) => &update.id,
 542            Self::UpdateDiff(diff) => &diff.id,
 543            Self::UpdateTerminal(terminal) => &terminal.id,
 544        }
 545    }
 546}
 547
 548impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 549    fn from(update: acp::ToolCallUpdate) -> Self {
 550        Self::UpdateFields(update)
 551    }
 552}
 553
 554impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 555    fn from(diff: ToolCallUpdateDiff) -> Self {
 556        Self::UpdateDiff(diff)
 557    }
 558}
 559
 560#[derive(Debug, PartialEq)]
 561pub struct ToolCallUpdateDiff {
 562    pub id: acp::ToolCallId,
 563    pub diff: Entity<Diff>,
 564}
 565
 566impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 567    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 568        Self::UpdateTerminal(terminal)
 569    }
 570}
 571
 572#[derive(Debug, PartialEq)]
 573pub struct ToolCallUpdateTerminal {
 574    pub id: acp::ToolCallId,
 575    pub terminal: Entity<Terminal>,
 576}
 577
 578#[derive(Debug, Default)]
 579pub struct Plan {
 580    pub entries: Vec<PlanEntry>,
 581}
 582
 583#[derive(Debug)]
 584pub struct PlanStats<'a> {
 585    pub in_progress_entry: Option<&'a PlanEntry>,
 586    pub pending: u32,
 587    pub completed: u32,
 588}
 589
 590impl Plan {
 591    pub fn is_empty(&self) -> bool {
 592        self.entries.is_empty()
 593    }
 594
 595    pub fn stats(&self) -> PlanStats<'_> {
 596        let mut stats = PlanStats {
 597            in_progress_entry: None,
 598            pending: 0,
 599            completed: 0,
 600        };
 601
 602        for entry in &self.entries {
 603            match &entry.status {
 604                acp::PlanEntryStatus::Pending => {
 605                    stats.pending += 1;
 606                }
 607                acp::PlanEntryStatus::InProgress => {
 608                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 609                }
 610                acp::PlanEntryStatus::Completed => {
 611                    stats.completed += 1;
 612                }
 613            }
 614        }
 615
 616        stats
 617    }
 618}
 619
 620#[derive(Debug)]
 621pub struct PlanEntry {
 622    pub content: Entity<Markdown>,
 623    pub priority: acp::PlanEntryPriority,
 624    pub status: acp::PlanEntryStatus,
 625}
 626
 627impl PlanEntry {
 628    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 629        Self {
 630            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 631            priority: entry.priority,
 632            status: entry.status,
 633        }
 634    }
 635}
 636
 637#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
 638pub struct AgentServerName(pub SharedString);
 639
 640#[derive(Debug, Clone, Serialize, Deserialize)]
 641pub struct AcpThreadMetadata {
 642    pub agent: AgentServerName,
 643    pub id: acp::SessionId,
 644    pub title: SharedString,
 645    pub updated_at: DateTime<Utc>,
 646}
 647
 648pub struct AcpThread {
 649    title: SharedString,
 650    entries: Vec<AgentThreadEntry>,
 651    plan: Plan,
 652    project: Entity<Project>,
 653    action_log: Entity<ActionLog>,
 654    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 655    send_task: Option<Task<()>>,
 656    connection: Rc<dyn AgentConnection>,
 657    session_id: acp::SessionId,
 658}
 659
 660pub enum AcpThreadEvent {
 661    NewEntry,
 662    EntryUpdated(usize),
 663    EntriesRemoved(Range<usize>),
 664    ToolAuthorizationRequired,
 665    Stopped,
 666    Error,
 667    ServerExited(ExitStatus),
 668}
 669
 670impl EventEmitter<AcpThreadEvent> for AcpThread {}
 671
 672#[derive(PartialEq, Eq)]
 673pub enum ThreadStatus {
 674    Idle,
 675    WaitingForToolConfirmation,
 676    Generating,
 677}
 678
 679#[derive(Debug, Clone)]
 680pub enum LoadError {
 681    Unsupported {
 682        error_message: SharedString,
 683        upgrade_message: SharedString,
 684        upgrade_command: String,
 685    },
 686    Exited(i32),
 687    Other(SharedString),
 688}
 689
 690impl Display for LoadError {
 691    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 692        match self {
 693            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 694            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 695            LoadError::Other(msg) => write!(f, "{}", msg),
 696        }
 697    }
 698}
 699
 700impl Error for LoadError {}
 701
 702impl AcpThread {
 703    pub fn new(
 704        title: impl Into<SharedString>,
 705        connection: Rc<dyn AgentConnection>,
 706        project: Entity<Project>,
 707        session_id: acp::SessionId,
 708        cx: &mut Context<Self>,
 709    ) -> Self {
 710        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 711
 712        Self {
 713            action_log,
 714            shared_buffers: Default::default(),
 715            entries: Default::default(),
 716            plan: Default::default(),
 717            title: title.into(),
 718            project,
 719            send_task: None,
 720            connection,
 721            session_id,
 722        }
 723    }
 724
 725    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 726        &self.connection
 727    }
 728
 729    pub fn action_log(&self) -> &Entity<ActionLog> {
 730        &self.action_log
 731    }
 732
 733    pub fn project(&self) -> &Entity<Project> {
 734        &self.project
 735    }
 736
 737    pub fn title(&self) -> SharedString {
 738        self.title.clone()
 739    }
 740
 741    pub fn entries(&self) -> &[AgentThreadEntry] {
 742        &self.entries
 743    }
 744
 745    pub fn session_id(&self) -> &acp::SessionId {
 746        &self.session_id
 747    }
 748
 749    pub fn status(&self) -> ThreadStatus {
 750        if self.send_task.is_some() {
 751            if self.waiting_for_tool_confirmation() {
 752                ThreadStatus::WaitingForToolConfirmation
 753            } else {
 754                ThreadStatus::Generating
 755            }
 756        } else {
 757            ThreadStatus::Idle
 758        }
 759    }
 760
 761    pub fn has_pending_edit_tool_calls(&self) -> bool {
 762        for entry in self.entries.iter().rev() {
 763            match entry {
 764                AgentThreadEntry::UserMessage(_) => return false,
 765                AgentThreadEntry::ToolCall(
 766                    call @ ToolCall {
 767                        status:
 768                            ToolCallStatus::Allowed {
 769                                status:
 770                                    acp::ToolCallStatus::InProgress | acp::ToolCallStatus::Pending,
 771                            },
 772                        ..
 773                    },
 774                ) if call.diffs().next().is_some() => {
 775                    return true;
 776                }
 777                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 778            }
 779        }
 780
 781        false
 782    }
 783
 784    pub fn used_tools_since_last_user_message(&self) -> bool {
 785        for entry in self.entries.iter().rev() {
 786            match entry {
 787                AgentThreadEntry::UserMessage(..) => return false,
 788                AgentThreadEntry::AssistantMessage(..) => continue,
 789                AgentThreadEntry::ToolCall(..) => return true,
 790            }
 791        }
 792
 793        false
 794    }
 795
 796    pub fn handle_session_update(
 797        &mut self,
 798        update: acp::SessionUpdate,
 799        cx: &mut Context<Self>,
 800    ) -> Result<()> {
 801        match update {
 802            acp::SessionUpdate::UserMessageChunk { content } => {
 803                self.push_user_content_block(None, content, cx);
 804            }
 805            acp::SessionUpdate::AgentMessageChunk { content } => {
 806                self.push_assistant_content_block(content, false, cx);
 807            }
 808            acp::SessionUpdate::AgentThoughtChunk { content } => {
 809                self.push_assistant_content_block(content, true, cx);
 810            }
 811            acp::SessionUpdate::ToolCall(tool_call) => {
 812                self.upsert_tool_call(tool_call, cx);
 813            }
 814            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 815                self.update_tool_call(tool_call_update, cx)?;
 816            }
 817            acp::SessionUpdate::Plan(plan) => {
 818                self.update_plan(plan, cx);
 819            }
 820        }
 821        Ok(())
 822    }
 823
 824    pub fn push_user_content_block(
 825        &mut self,
 826        message_id: Option<UserMessageId>,
 827        chunk: acp::ContentBlock,
 828        cx: &mut Context<Self>,
 829    ) {
 830        let language_registry = self.project.read(cx).languages().clone();
 831        let entries_len = self.entries.len();
 832
 833        if let Some(last_entry) = self.entries.last_mut()
 834            && let AgentThreadEntry::UserMessage(UserMessage {
 835                id,
 836                content,
 837                chunks,
 838                ..
 839            }) = last_entry
 840        {
 841            *id = message_id.or(id.take());
 842            content.append(chunk.clone(), &language_registry, cx);
 843            chunks.push(chunk);
 844            let idx = entries_len - 1;
 845            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 846        } else {
 847            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 848            self.push_entry(
 849                AgentThreadEntry::UserMessage(UserMessage {
 850                    id: message_id,
 851                    content,
 852                    chunks: vec![chunk],
 853                    checkpoint: None,
 854                }),
 855                cx,
 856            );
 857        }
 858    }
 859
 860    pub fn push_assistant_content_block(
 861        &mut self,
 862        chunk: acp::ContentBlock,
 863        is_thought: bool,
 864        cx: &mut Context<Self>,
 865    ) {
 866        let language_registry = self.project.read(cx).languages().clone();
 867        let entries_len = self.entries.len();
 868        if let Some(last_entry) = self.entries.last_mut()
 869            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 870        {
 871            let idx = entries_len - 1;
 872            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 873            match (chunks.last_mut(), is_thought) {
 874                (Some(AssistantMessageChunk::Message { block }), false)
 875                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 876                    block.append(chunk, &language_registry, cx)
 877                }
 878                _ => {
 879                    let block = ContentBlock::new(chunk, &language_registry, cx);
 880                    if is_thought {
 881                        chunks.push(AssistantMessageChunk::Thought { block })
 882                    } else {
 883                        chunks.push(AssistantMessageChunk::Message { block })
 884                    }
 885                }
 886            }
 887        } else {
 888            let block = ContentBlock::new(chunk, &language_registry, cx);
 889            let chunk = if is_thought {
 890                AssistantMessageChunk::Thought { block }
 891            } else {
 892                AssistantMessageChunk::Message { block }
 893            };
 894
 895            self.push_entry(
 896                AgentThreadEntry::AssistantMessage(AssistantMessage {
 897                    chunks: vec![chunk],
 898                }),
 899                cx,
 900            );
 901        }
 902    }
 903
 904    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 905        self.entries.push(entry);
 906        cx.emit(AcpThreadEvent::NewEntry);
 907    }
 908
 909    pub fn update_tool_call(
 910        &mut self,
 911        update: impl Into<ToolCallUpdate>,
 912        cx: &mut Context<Self>,
 913    ) -> Result<()> {
 914        let update = update.into();
 915        let languages = self.project.read(cx).languages().clone();
 916
 917        let (ix, current_call) = self
 918            .tool_call_mut(update.id())
 919            .context("Tool call not found")?;
 920        match update {
 921            ToolCallUpdate::UpdateFields(update) => {
 922                let location_updated = update.fields.locations.is_some();
 923                current_call.update_fields(update.fields, languages, cx);
 924                if location_updated {
 925                    self.resolve_locations(update.id.clone(), cx);
 926                }
 927            }
 928            ToolCallUpdate::UpdateDiff(update) => {
 929                current_call.content.clear();
 930                current_call
 931                    .content
 932                    .push(ToolCallContent::Diff(update.diff));
 933            }
 934            ToolCallUpdate::UpdateTerminal(update) => {
 935                current_call.content.clear();
 936                current_call
 937                    .content
 938                    .push(ToolCallContent::Terminal(update.terminal));
 939            }
 940        }
 941
 942        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 943
 944        Ok(())
 945    }
 946
 947    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
 948    pub fn upsert_tool_call(&mut self, tool_call: acp::ToolCall, cx: &mut Context<Self>) {
 949        let status = ToolCallStatus::Allowed {
 950            status: tool_call.status,
 951        };
 952        self.upsert_tool_call_inner(tool_call, status, cx)
 953    }
 954
 955    pub fn upsert_tool_call_inner(
 956        &mut self,
 957        tool_call: acp::ToolCall,
 958        status: ToolCallStatus,
 959        cx: &mut Context<Self>,
 960    ) {
 961        let language_registry = self.project.read(cx).languages().clone();
 962        let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
 963        let id = call.id.clone();
 964
 965        if let Some((ix, current_call)) = self.tool_call_mut(&call.id) {
 966            *current_call = call;
 967
 968            cx.emit(AcpThreadEvent::EntryUpdated(ix));
 969        } else {
 970            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
 971        };
 972
 973        self.resolve_locations(id, cx);
 974    }
 975
 976    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
 977        // The tool call we are looking for is typically the last one, or very close to the end.
 978        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
 979        self.entries
 980            .iter_mut()
 981            .enumerate()
 982            .rev()
 983            .find_map(|(index, tool_call)| {
 984                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
 985                    && &tool_call.id == id
 986                {
 987                    Some((index, tool_call))
 988                } else {
 989                    None
 990                }
 991            })
 992    }
 993
 994    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
 995        let project = self.project.clone();
 996        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
 997            return;
 998        };
 999        let task = tool_call.resolve_locations(project, cx);
1000        cx.spawn(async move |this, cx| {
1001            let resolved_locations = task.await;
1002            this.update(cx, |this, cx| {
1003                let project = this.project.clone();
1004                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1005                    return;
1006                };
1007                if let Some(Some(location)) = resolved_locations.last() {
1008                    project.update(cx, |project, cx| {
1009                        if let Some(agent_location) = project.agent_location() {
1010                            let should_ignore = agent_location.buffer == location.buffer
1011                                && location
1012                                    .buffer
1013                                    .update(cx, |buffer, _| {
1014                                        let snapshot = buffer.snapshot();
1015                                        let old_position =
1016                                            agent_location.position.to_point(&snapshot);
1017                                        let new_position = location.position.to_point(&snapshot);
1018                                        // ignore this so that when we get updates from the edit tool
1019                                        // the position doesn't reset to the startof line
1020                                        old_position.row == new_position.row
1021                                            && old_position.column > new_position.column
1022                                    })
1023                                    .ok()
1024                                    .unwrap_or_default();
1025                            if !should_ignore {
1026                                project.set_agent_location(Some(location.clone()), cx);
1027                            }
1028                        }
1029                    });
1030                }
1031                if tool_call.resolved_locations != resolved_locations {
1032                    tool_call.resolved_locations = resolved_locations;
1033                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1034                }
1035            })
1036        })
1037        .detach();
1038    }
1039
1040    pub fn request_tool_call_authorization(
1041        &mut self,
1042        tool_call: acp::ToolCall,
1043        options: Vec<acp::PermissionOption>,
1044        cx: &mut Context<Self>,
1045    ) -> oneshot::Receiver<acp::PermissionOptionId> {
1046        let (tx, rx) = oneshot::channel();
1047
1048        let status = ToolCallStatus::WaitingForConfirmation {
1049            options,
1050            respond_tx: tx,
1051        };
1052
1053        self.upsert_tool_call_inner(tool_call, status, cx);
1054        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1055        rx
1056    }
1057
1058    pub fn authorize_tool_call(
1059        &mut self,
1060        id: acp::ToolCallId,
1061        option_id: acp::PermissionOptionId,
1062        option_kind: acp::PermissionOptionKind,
1063        cx: &mut Context<Self>,
1064    ) {
1065        let Some((ix, call)) = self.tool_call_mut(&id) else {
1066            return;
1067        };
1068
1069        let new_status = match option_kind {
1070            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1071                ToolCallStatus::Rejected
1072            }
1073            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1074                ToolCallStatus::Allowed {
1075                    status: acp::ToolCallStatus::InProgress,
1076                }
1077            }
1078        };
1079
1080        let curr_status = mem::replace(&mut call.status, new_status);
1081
1082        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1083            respond_tx.send(option_id).log_err();
1084        } else if cfg!(debug_assertions) {
1085            panic!("tried to authorize an already authorized tool call");
1086        }
1087
1088        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1089    }
1090
1091    /// Returns true if the last turn is awaiting tool authorization
1092    pub fn waiting_for_tool_confirmation(&self) -> bool {
1093        for entry in self.entries.iter().rev() {
1094            match &entry {
1095                AgentThreadEntry::ToolCall(call) => match call.status {
1096                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1097                    ToolCallStatus::Allowed { .. }
1098                    | ToolCallStatus::Rejected
1099                    | ToolCallStatus::Canceled => continue,
1100                },
1101                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1102                    // Reached the beginning of the turn
1103                    return false;
1104                }
1105            }
1106        }
1107        false
1108    }
1109
1110    pub fn plan(&self) -> &Plan {
1111        &self.plan
1112    }
1113
1114    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1115        let new_entries_len = request.entries.len();
1116        let mut new_entries = request.entries.into_iter();
1117
1118        // Reuse existing markdown to prevent flickering
1119        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1120            let PlanEntry {
1121                content,
1122                priority,
1123                status,
1124            } = old;
1125            content.update(cx, |old, cx| {
1126                old.replace(new.content, cx);
1127            });
1128            *priority = new.priority;
1129            *status = new.status;
1130        }
1131        for new in new_entries {
1132            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1133        }
1134        self.plan.entries.truncate(new_entries_len);
1135
1136        cx.notify();
1137    }
1138
1139    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1140        self.plan
1141            .entries
1142            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1143        cx.notify();
1144    }
1145
1146    #[cfg(any(test, feature = "test-support"))]
1147    pub fn send_raw(
1148        &mut self,
1149        message: &str,
1150        cx: &mut Context<Self>,
1151    ) -> BoxFuture<'static, Result<()>> {
1152        self.send(
1153            vec![acp::ContentBlock::Text(acp::TextContent {
1154                text: message.to_string(),
1155                annotations: None,
1156            })],
1157            cx,
1158        )
1159    }
1160
1161    pub fn send(
1162        &mut self,
1163        message: Vec<acp::ContentBlock>,
1164        cx: &mut Context<Self>,
1165    ) -> BoxFuture<'static, Result<()>> {
1166        let block = ContentBlock::new_combined(
1167            message.clone(),
1168            self.project.read(cx).languages().clone(),
1169            cx,
1170        );
1171        let request = acp::PromptRequest {
1172            prompt: message.clone(),
1173            session_id: self.session_id.clone(),
1174        };
1175        let git_store = self.project.read(cx).git_store().clone();
1176
1177        let message_id = if self
1178            .connection
1179            .session_editor(&self.session_id, cx)
1180            .is_some()
1181        {
1182            Some(UserMessageId::new())
1183        } else {
1184            None
1185        };
1186        self.push_entry(
1187            AgentThreadEntry::UserMessage(UserMessage {
1188                id: message_id.clone(),
1189                content: block,
1190                chunks: message,
1191                checkpoint: None,
1192            }),
1193            cx,
1194        );
1195
1196        self.run_turn(cx, async move |this, cx| {
1197            let old_checkpoint = git_store
1198                .update(cx, |git, cx| git.checkpoint(cx))?
1199                .await
1200                .context("failed to get old checkpoint")
1201                .log_err();
1202            this.update(cx, |this, cx| {
1203                if let Some((_ix, message)) = this.last_user_message() {
1204                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1205                        git_checkpoint,
1206                        show: false,
1207                    });
1208                }
1209                this.connection.prompt(message_id, request, cx)
1210            })?
1211            .await
1212        })
1213    }
1214
1215    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1216        self.run_turn(cx, async move |this, cx| {
1217            this.update(cx, |this, cx| {
1218                this.connection
1219                    .resume(&this.session_id, cx)
1220                    .map(|resume| resume.run(cx))
1221            })?
1222            .context("resuming a session is not supported")?
1223            .await
1224        })
1225    }
1226
1227    fn run_turn(
1228        &mut self,
1229        cx: &mut Context<Self>,
1230        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1231    ) -> BoxFuture<'static, Result<()>> {
1232        self.clear_completed_plan_entries(cx);
1233
1234        let (tx, rx) = oneshot::channel();
1235        let cancel_task = self.cancel(cx);
1236
1237        self.send_task = Some(cx.spawn(async move |this, cx| {
1238            cancel_task.await;
1239            tx.send(f(this, cx).await).ok();
1240        }));
1241
1242        cx.spawn(async move |this, cx| {
1243            let response = rx.await;
1244
1245            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1246                .await?;
1247
1248            this.update(cx, |this, cx| {
1249                match response {
1250                    Ok(Err(e)) => {
1251                        this.send_task.take();
1252                        cx.emit(AcpThreadEvent::Error);
1253                        Err(e)
1254                    }
1255                    result => {
1256                        let cancelled = matches!(
1257                            result,
1258                            Ok(Ok(acp::PromptResponse {
1259                                stop_reason: acp::StopReason::Cancelled
1260                            }))
1261                        );
1262
1263                        // We only take the task if the current prompt wasn't cancelled.
1264                        //
1265                        // This prompt may have been cancelled because another one was sent
1266                        // while it was still generating. In these cases, dropping `send_task`
1267                        // would cause the next generation to be cancelled.
1268                        if !cancelled {
1269                            this.send_task.take();
1270                        }
1271
1272                        cx.emit(AcpThreadEvent::Stopped);
1273                        Ok(())
1274                    }
1275                }
1276            })?
1277        })
1278        .boxed()
1279    }
1280
1281    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1282        let Some(send_task) = self.send_task.take() else {
1283            return Task::ready(());
1284        };
1285
1286        for entry in self.entries.iter_mut() {
1287            if let AgentThreadEntry::ToolCall(call) = entry {
1288                let cancel = matches!(
1289                    call.status,
1290                    ToolCallStatus::WaitingForConfirmation { .. }
1291                        | ToolCallStatus::Allowed {
1292                            status: acp::ToolCallStatus::InProgress
1293                        }
1294                );
1295
1296                if cancel {
1297                    call.status = ToolCallStatus::Canceled;
1298                }
1299            }
1300        }
1301
1302        self.connection.cancel(&self.session_id, cx);
1303
1304        // Wait for the send task to complete
1305        cx.foreground_executor().spawn(send_task)
1306    }
1307
1308    /// Rewinds this thread to before the entry at `index`, removing it and all
1309    /// subsequent entries while reverting any changes made from that point.
1310    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1311        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1312            return Task::ready(Err(anyhow!("not supported")));
1313        };
1314        let Some(message) = self.user_message(&id) else {
1315            return Task::ready(Err(anyhow!("message not found")));
1316        };
1317
1318        let checkpoint = message
1319            .checkpoint
1320            .as_ref()
1321            .map(|c| c.git_checkpoint.clone());
1322
1323        let git_store = self.project.read(cx).git_store().clone();
1324        cx.spawn(async move |this, cx| {
1325            if let Some(checkpoint) = checkpoint {
1326                git_store
1327                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1328                    .await?;
1329            }
1330
1331            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1332                .await?;
1333            this.update(cx, |this, cx| {
1334                if let Some((ix, _)) = this.user_message_mut(&id) {
1335                    let range = ix..this.entries.len();
1336                    this.entries.truncate(ix);
1337                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1338                }
1339            })
1340        })
1341    }
1342
1343    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1344        let git_store = self.project.read(cx).git_store().clone();
1345
1346        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1347            if let Some(checkpoint) = message.checkpoint.as_ref() {
1348                checkpoint.git_checkpoint.clone()
1349            } else {
1350                return Task::ready(Ok(()));
1351            }
1352        } else {
1353            return Task::ready(Ok(()));
1354        };
1355
1356        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1357        cx.spawn(async move |this, cx| {
1358            let new_checkpoint = new_checkpoint
1359                .await
1360                .context("failed to get new checkpoint")
1361                .log_err();
1362            if let Some(new_checkpoint) = new_checkpoint {
1363                let equal = git_store
1364                    .update(cx, |git, cx| {
1365                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1366                    })?
1367                    .await
1368                    .unwrap_or(true);
1369                this.update(cx, |this, cx| {
1370                    let (ix, message) = this.last_user_message().context("no user message")?;
1371                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1372                    checkpoint.show = !equal;
1373                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1374                    anyhow::Ok(())
1375                })??;
1376            }
1377
1378            Ok(())
1379        })
1380    }
1381
1382    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1383        self.entries
1384            .iter_mut()
1385            .enumerate()
1386            .rev()
1387            .find_map(|(ix, entry)| {
1388                if let AgentThreadEntry::UserMessage(message) = entry {
1389                    Some((ix, message))
1390                } else {
1391                    None
1392                }
1393            })
1394    }
1395
1396    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1397        self.entries.iter().find_map(|entry| {
1398            if let AgentThreadEntry::UserMessage(message) = entry {
1399                if message.id.as_ref() == Some(&id) {
1400                    Some(message)
1401                } else {
1402                    None
1403                }
1404            } else {
1405                None
1406            }
1407        })
1408    }
1409
1410    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1411        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1412            if let AgentThreadEntry::UserMessage(message) = entry {
1413                if message.id.as_ref() == Some(&id) {
1414                    Some((ix, message))
1415                } else {
1416                    None
1417                }
1418            } else {
1419                None
1420            }
1421        })
1422    }
1423
1424    pub fn read_text_file(
1425        &self,
1426        path: PathBuf,
1427        line: Option<u32>,
1428        limit: Option<u32>,
1429        reuse_shared_snapshot: bool,
1430        cx: &mut Context<Self>,
1431    ) -> Task<Result<String>> {
1432        let project = self.project.clone();
1433        let action_log = self.action_log.clone();
1434        cx.spawn(async move |this, cx| {
1435            let load = project.update(cx, |project, cx| {
1436                let path = project
1437                    .project_path_for_absolute_path(&path, cx)
1438                    .context("invalid path")?;
1439                anyhow::Ok(project.open_buffer(path, cx))
1440            });
1441            let buffer = load??.await?;
1442
1443            let snapshot = if reuse_shared_snapshot {
1444                this.read_with(cx, |this, _| {
1445                    this.shared_buffers.get(&buffer.clone()).cloned()
1446                })
1447                .log_err()
1448                .flatten()
1449            } else {
1450                None
1451            };
1452
1453            let snapshot = if let Some(snapshot) = snapshot {
1454                snapshot
1455            } else {
1456                action_log.update(cx, |action_log, cx| {
1457                    action_log.buffer_read(buffer.clone(), cx);
1458                })?;
1459                project.update(cx, |project, cx| {
1460                    let position = buffer
1461                        .read(cx)
1462                        .snapshot()
1463                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1464                    project.set_agent_location(
1465                        Some(AgentLocation {
1466                            buffer: buffer.downgrade(),
1467                            position,
1468                        }),
1469                        cx,
1470                    );
1471                })?;
1472
1473                buffer.update(cx, |buffer, _| buffer.snapshot())?
1474            };
1475
1476            this.update(cx, |this, _| {
1477                let text = snapshot.text();
1478                this.shared_buffers.insert(buffer.clone(), snapshot);
1479                if line.is_none() && limit.is_none() {
1480                    return Ok(text);
1481                }
1482                let limit = limit.unwrap_or(u32::MAX) as usize;
1483                let Some(line) = line else {
1484                    return Ok(text.lines().take(limit).collect::<String>());
1485                };
1486
1487                let count = text.lines().count();
1488                if count < line as usize {
1489                    anyhow::bail!("There are only {} lines", count);
1490                }
1491                Ok(text
1492                    .lines()
1493                    .skip(line as usize + 1)
1494                    .take(limit)
1495                    .collect::<String>())
1496            })?
1497        })
1498    }
1499
1500    pub fn write_text_file(
1501        &self,
1502        path: PathBuf,
1503        content: String,
1504        cx: &mut Context<Self>,
1505    ) -> Task<Result<()>> {
1506        let project = self.project.clone();
1507        let action_log = self.action_log.clone();
1508        cx.spawn(async move |this, cx| {
1509            let load = project.update(cx, |project, cx| {
1510                let path = project
1511                    .project_path_for_absolute_path(&path, cx)
1512                    .context("invalid path")?;
1513                anyhow::Ok(project.open_buffer(path, cx))
1514            });
1515            let buffer = load??.await?;
1516            let snapshot = this.update(cx, |this, cx| {
1517                this.shared_buffers
1518                    .get(&buffer)
1519                    .cloned()
1520                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1521            })?;
1522            let edits = cx
1523                .background_executor()
1524                .spawn(async move {
1525                    let old_text = snapshot.text();
1526                    text_diff(old_text.as_str(), &content)
1527                        .into_iter()
1528                        .map(|(range, replacement)| {
1529                            (
1530                                snapshot.anchor_after(range.start)
1531                                    ..snapshot.anchor_before(range.end),
1532                                replacement,
1533                            )
1534                        })
1535                        .collect::<Vec<_>>()
1536                })
1537                .await;
1538            cx.update(|cx| {
1539                project.update(cx, |project, cx| {
1540                    project.set_agent_location(
1541                        Some(AgentLocation {
1542                            buffer: buffer.downgrade(),
1543                            position: edits
1544                                .last()
1545                                .map(|(range, _)| range.end)
1546                                .unwrap_or(Anchor::MIN),
1547                        }),
1548                        cx,
1549                    );
1550                });
1551
1552                action_log.update(cx, |action_log, cx| {
1553                    action_log.buffer_read(buffer.clone(), cx);
1554                });
1555                buffer.update(cx, |buffer, cx| {
1556                    buffer.edit(edits, None, cx);
1557                });
1558                action_log.update(cx, |action_log, cx| {
1559                    action_log.buffer_edited(buffer.clone(), cx);
1560                });
1561            })?;
1562            project
1563                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1564                .await
1565        })
1566    }
1567
1568    pub fn to_markdown(&self, cx: &App) -> String {
1569        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1570    }
1571
1572    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1573        cx.emit(AcpThreadEvent::ServerExited(status));
1574    }
1575}
1576
1577fn markdown_for_raw_output(
1578    raw_output: &serde_json::Value,
1579    language_registry: &Arc<LanguageRegistry>,
1580    cx: &mut App,
1581) -> Option<Entity<Markdown>> {
1582    match raw_output {
1583        serde_json::Value::Null => None,
1584        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1585            Markdown::new(
1586                value.to_string().into(),
1587                Some(language_registry.clone()),
1588                None,
1589                cx,
1590            )
1591        })),
1592        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1593            Markdown::new(
1594                value.to_string().into(),
1595                Some(language_registry.clone()),
1596                None,
1597                cx,
1598            )
1599        })),
1600        serde_json::Value::String(value) => Some(cx.new(|cx| {
1601            Markdown::new(
1602                value.clone().into(),
1603                Some(language_registry.clone()),
1604                None,
1605                cx,
1606            )
1607        })),
1608        value => Some(cx.new(|cx| {
1609            Markdown::new(
1610                format!("```json\n{}\n```", value).into(),
1611                Some(language_registry.clone()),
1612                None,
1613                cx,
1614            )
1615        })),
1616    }
1617}
1618
1619#[cfg(test)]
1620mod tests {
1621    use super::*;
1622    use anyhow::anyhow;
1623    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1624    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
1625    use indoc::indoc;
1626    use project::{FakeFs, Fs};
1627    use rand::Rng as _;
1628    use serde_json::json;
1629    use settings::SettingsStore;
1630    use smol::stream::StreamExt as _;
1631    use std::{
1632        any::Any,
1633        cell::RefCell,
1634        path::Path,
1635        rc::Rc,
1636        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1637        time::Duration,
1638    };
1639    use util::path;
1640
1641    fn init_test(cx: &mut TestAppContext) {
1642        env_logger::try_init().ok();
1643        cx.update(|cx| {
1644            let settings_store = SettingsStore::test(cx);
1645            cx.set_global(settings_store);
1646            Project::init_settings(cx);
1647            language::init(cx);
1648        });
1649    }
1650
1651    #[gpui::test]
1652    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1653        init_test(cx);
1654
1655        let fs = FakeFs::new(cx.executor());
1656        let project = Project::test(fs, [], cx).await;
1657        let connection = Rc::new(FakeAgentConnection::new());
1658        let thread = cx
1659            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1660            .await
1661            .unwrap();
1662
1663        // Test creating a new user message
1664        thread.update(cx, |thread, cx| {
1665            thread.push_user_content_block(
1666                None,
1667                acp::ContentBlock::Text(acp::TextContent {
1668                    annotations: None,
1669                    text: "Hello, ".to_string(),
1670                }),
1671                cx,
1672            );
1673        });
1674
1675        thread.update(cx, |thread, cx| {
1676            assert_eq!(thread.entries.len(), 1);
1677            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1678                assert_eq!(user_msg.id, None);
1679                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1680            } else {
1681                panic!("Expected UserMessage");
1682            }
1683        });
1684
1685        // Test appending to existing user message
1686        let message_1_id = UserMessageId::new();
1687        thread.update(cx, |thread, cx| {
1688            thread.push_user_content_block(
1689                Some(message_1_id.clone()),
1690                acp::ContentBlock::Text(acp::TextContent {
1691                    annotations: None,
1692                    text: "world!".to_string(),
1693                }),
1694                cx,
1695            );
1696        });
1697
1698        thread.update(cx, |thread, cx| {
1699            assert_eq!(thread.entries.len(), 1);
1700            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1701                assert_eq!(user_msg.id, Some(message_1_id));
1702                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1703            } else {
1704                panic!("Expected UserMessage");
1705            }
1706        });
1707
1708        // Test creating new user message after assistant message
1709        thread.update(cx, |thread, cx| {
1710            thread.push_assistant_content_block(
1711                acp::ContentBlock::Text(acp::TextContent {
1712                    annotations: None,
1713                    text: "Assistant response".to_string(),
1714                }),
1715                false,
1716                cx,
1717            );
1718        });
1719
1720        let message_2_id = UserMessageId::new();
1721        thread.update(cx, |thread, cx| {
1722            thread.push_user_content_block(
1723                Some(message_2_id.clone()),
1724                acp::ContentBlock::Text(acp::TextContent {
1725                    annotations: None,
1726                    text: "New user message".to_string(),
1727                }),
1728                cx,
1729            );
1730        });
1731
1732        thread.update(cx, |thread, cx| {
1733            assert_eq!(thread.entries.len(), 3);
1734            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1735                assert_eq!(user_msg.id, Some(message_2_id));
1736                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1737            } else {
1738                panic!("Expected UserMessage at index 2");
1739            }
1740        });
1741    }
1742
1743    #[gpui::test]
1744    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1745        init_test(cx);
1746
1747        let fs = FakeFs::new(cx.executor());
1748        let project = Project::test(fs, [], cx).await;
1749        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1750            |_, thread, mut cx| {
1751                async move {
1752                    thread.update(&mut cx, |thread, cx| {
1753                        thread
1754                            .handle_session_update(
1755                                acp::SessionUpdate::AgentThoughtChunk {
1756                                    content: "Thinking ".into(),
1757                                },
1758                                cx,
1759                            )
1760                            .unwrap();
1761                        thread
1762                            .handle_session_update(
1763                                acp::SessionUpdate::AgentThoughtChunk {
1764                                    content: "hard!".into(),
1765                                },
1766                                cx,
1767                            )
1768                            .unwrap();
1769                    })?;
1770                    Ok(acp::PromptResponse {
1771                        stop_reason: acp::StopReason::EndTurn,
1772                    })
1773                }
1774                .boxed_local()
1775            },
1776        ));
1777
1778        let thread = cx
1779            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1780            .await
1781            .unwrap();
1782
1783        thread
1784            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1785            .await
1786            .unwrap();
1787
1788        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1789        assert_eq!(
1790            output,
1791            indoc! {r#"
1792            ## User
1793
1794            Hello from Zed!
1795
1796            ## Assistant
1797
1798            <thinking>
1799            Thinking hard!
1800            </thinking>
1801
1802            "#}
1803        );
1804    }
1805
1806    #[gpui::test]
1807    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1808        init_test(cx);
1809
1810        let fs = FakeFs::new(cx.executor());
1811        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1812            .await;
1813        let project = Project::test(fs.clone(), [], cx).await;
1814        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1815        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1816        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1817            move |_, thread, mut cx| {
1818                let read_file_tx = read_file_tx.clone();
1819                async move {
1820                    let content = thread
1821                        .update(&mut cx, |thread, cx| {
1822                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1823                        })
1824                        .unwrap()
1825                        .await
1826                        .unwrap();
1827                    assert_eq!(content, "one\ntwo\nthree\n");
1828                    read_file_tx.take().unwrap().send(()).unwrap();
1829                    thread
1830                        .update(&mut cx, |thread, cx| {
1831                            thread.write_text_file(
1832                                path!("/tmp/foo").into(),
1833                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1834                                cx,
1835                            )
1836                        })
1837                        .unwrap()
1838                        .await
1839                        .unwrap();
1840                    Ok(acp::PromptResponse {
1841                        stop_reason: acp::StopReason::EndTurn,
1842                    })
1843                }
1844                .boxed_local()
1845            },
1846        ));
1847
1848        let (worktree, pathbuf) = project
1849            .update(cx, |project, cx| {
1850                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1851            })
1852            .await
1853            .unwrap();
1854        let buffer = project
1855            .update(cx, |project, cx| {
1856                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1857            })
1858            .await
1859            .unwrap();
1860
1861        let thread = cx
1862            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1863            .await
1864            .unwrap();
1865
1866        let request = thread.update(cx, |thread, cx| {
1867            thread.send_raw("Extend the count in /tmp/foo", cx)
1868        });
1869        read_file_rx.await.ok();
1870        buffer.update(cx, |buffer, cx| {
1871            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1872        });
1873        cx.run_until_parked();
1874        assert_eq!(
1875            buffer.read_with(cx, |buffer, _| buffer.text()),
1876            "zero\none\ntwo\nthree\nfour\nfive\n"
1877        );
1878        assert_eq!(
1879            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1880            "zero\none\ntwo\nthree\nfour\nfive\n"
1881        );
1882        request.await.unwrap();
1883    }
1884
1885    #[gpui::test]
1886    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1887        init_test(cx);
1888
1889        let fs = FakeFs::new(cx.executor());
1890        let project = Project::test(fs, [], cx).await;
1891        let id = acp::ToolCallId("test".into());
1892
1893        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1894            let id = id.clone();
1895            move |_, thread, mut cx| {
1896                let id = id.clone();
1897                async move {
1898                    thread
1899                        .update(&mut cx, |thread, cx| {
1900                            thread.handle_session_update(
1901                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1902                                    id: id.clone(),
1903                                    title: "Label".into(),
1904                                    kind: acp::ToolKind::Fetch,
1905                                    status: acp::ToolCallStatus::InProgress,
1906                                    content: vec![],
1907                                    locations: vec![],
1908                                    raw_input: None,
1909                                    raw_output: None,
1910                                }),
1911                                cx,
1912                            )
1913                        })
1914                        .unwrap()
1915                        .unwrap();
1916                    Ok(acp::PromptResponse {
1917                        stop_reason: acp::StopReason::EndTurn,
1918                    })
1919                }
1920                .boxed_local()
1921            }
1922        }));
1923
1924        let thread = cx
1925            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1926            .await
1927            .unwrap();
1928
1929        let request = thread.update(cx, |thread, cx| {
1930            thread.send_raw("Fetch https://example.com", cx)
1931        });
1932
1933        run_until_first_tool_call(&thread, cx).await;
1934
1935        thread.read_with(cx, |thread, _| {
1936            assert!(matches!(
1937                thread.entries[1],
1938                AgentThreadEntry::ToolCall(ToolCall {
1939                    status: ToolCallStatus::Allowed {
1940                        status: acp::ToolCallStatus::InProgress,
1941                        ..
1942                    },
1943                    ..
1944                })
1945            ));
1946        });
1947
1948        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1949
1950        thread.read_with(cx, |thread, _| {
1951            assert!(matches!(
1952                &thread.entries[1],
1953                AgentThreadEntry::ToolCall(ToolCall {
1954                    status: ToolCallStatus::Canceled,
1955                    ..
1956                })
1957            ));
1958        });
1959
1960        thread
1961            .update(cx, |thread, cx| {
1962                thread.handle_session_update(
1963                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
1964                        id,
1965                        fields: acp::ToolCallUpdateFields {
1966                            status: Some(acp::ToolCallStatus::Completed),
1967                            ..Default::default()
1968                        },
1969                    }),
1970                    cx,
1971                )
1972            })
1973            .unwrap();
1974
1975        request.await.unwrap();
1976
1977        thread.read_with(cx, |thread, _| {
1978            assert!(matches!(
1979                thread.entries[1],
1980                AgentThreadEntry::ToolCall(ToolCall {
1981                    status: ToolCallStatus::Allowed {
1982                        status: acp::ToolCallStatus::Completed,
1983                        ..
1984                    },
1985                    ..
1986                })
1987            ));
1988        });
1989    }
1990
1991    #[gpui::test]
1992    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
1993        init_test(cx);
1994        let fs = FakeFs::new(cx.background_executor.clone());
1995        fs.insert_tree(path!("/test"), json!({})).await;
1996        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
1997
1998        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1999            move |_, thread, mut cx| {
2000                async move {
2001                    thread
2002                        .update(&mut cx, |thread, cx| {
2003                            thread.handle_session_update(
2004                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2005                                    id: acp::ToolCallId("test".into()),
2006                                    title: "Label".into(),
2007                                    kind: acp::ToolKind::Edit,
2008                                    status: acp::ToolCallStatus::Completed,
2009                                    content: vec![acp::ToolCallContent::Diff {
2010                                        diff: acp::Diff {
2011                                            path: "/test/test.txt".into(),
2012                                            old_text: None,
2013                                            new_text: "foo".into(),
2014                                        },
2015                                    }],
2016                                    locations: vec![],
2017                                    raw_input: None,
2018                                    raw_output: None,
2019                                }),
2020                                cx,
2021                            )
2022                        })
2023                        .unwrap()
2024                        .unwrap();
2025                    Ok(acp::PromptResponse {
2026                        stop_reason: acp::StopReason::EndTurn,
2027                    })
2028                }
2029                .boxed_local()
2030            }
2031        }));
2032
2033        let thread = cx
2034            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2035            .await
2036            .unwrap();
2037
2038        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2039            .await
2040            .unwrap();
2041
2042        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2043    }
2044
2045    #[gpui::test(iterations = 10)]
2046    async fn test_checkpoints(cx: &mut TestAppContext) {
2047        init_test(cx);
2048        let fs = FakeFs::new(cx.background_executor.clone());
2049        fs.insert_tree(
2050            path!("/test"),
2051            json!({
2052                ".git": {}
2053            }),
2054        )
2055        .await;
2056        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2057
2058        let simulate_changes = Arc::new(AtomicBool::new(true));
2059        let next_filename = Arc::new(AtomicUsize::new(0));
2060        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2061            let simulate_changes = simulate_changes.clone();
2062            let next_filename = next_filename.clone();
2063            let fs = fs.clone();
2064            move |request, thread, mut cx| {
2065                let fs = fs.clone();
2066                let simulate_changes = simulate_changes.clone();
2067                let next_filename = next_filename.clone();
2068                async move {
2069                    if simulate_changes.load(SeqCst) {
2070                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2071                        fs.write(Path::new(&filename), b"").await?;
2072                    }
2073
2074                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2075                        panic!("expected text content block");
2076                    };
2077                    thread.update(&mut cx, |thread, cx| {
2078                        thread
2079                            .handle_session_update(
2080                                acp::SessionUpdate::AgentMessageChunk {
2081                                    content: content.text.to_uppercase().into(),
2082                                },
2083                                cx,
2084                            )
2085                            .unwrap();
2086                    })?;
2087                    Ok(acp::PromptResponse {
2088                        stop_reason: acp::StopReason::EndTurn,
2089                    })
2090                }
2091                .boxed_local()
2092            }
2093        }));
2094        let thread = cx
2095            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2096            .await
2097            .unwrap();
2098
2099        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2100            .await
2101            .unwrap();
2102        thread.read_with(cx, |thread, cx| {
2103            assert_eq!(
2104                thread.to_markdown(cx),
2105                indoc! {"
2106                    ## User (checkpoint)
2107
2108                    Lorem
2109
2110                    ## Assistant
2111
2112                    LOREM
2113
2114                "}
2115            );
2116        });
2117        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2118
2119        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2120            .await
2121            .unwrap();
2122        thread.read_with(cx, |thread, cx| {
2123            assert_eq!(
2124                thread.to_markdown(cx),
2125                indoc! {"
2126                    ## User (checkpoint)
2127
2128                    Lorem
2129
2130                    ## Assistant
2131
2132                    LOREM
2133
2134                    ## User (checkpoint)
2135
2136                    ipsum
2137
2138                    ## Assistant
2139
2140                    IPSUM
2141
2142                "}
2143            );
2144        });
2145        assert_eq!(
2146            fs.files(),
2147            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2148        );
2149
2150        // Checkpoint isn't stored when there are no changes.
2151        simulate_changes.store(false, SeqCst);
2152        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2153            .await
2154            .unwrap();
2155        thread.read_with(cx, |thread, cx| {
2156            assert_eq!(
2157                thread.to_markdown(cx),
2158                indoc! {"
2159                    ## User (checkpoint)
2160
2161                    Lorem
2162
2163                    ## Assistant
2164
2165                    LOREM
2166
2167                    ## User (checkpoint)
2168
2169                    ipsum
2170
2171                    ## Assistant
2172
2173                    IPSUM
2174
2175                    ## User
2176
2177                    dolor
2178
2179                    ## Assistant
2180
2181                    DOLOR
2182
2183                "}
2184            );
2185        });
2186        assert_eq!(
2187            fs.files(),
2188            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2189        );
2190
2191        // Rewinding the conversation truncates the history and restores the checkpoint.
2192        thread
2193            .update(cx, |thread, cx| {
2194                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2195                    panic!("unexpected entries {:?}", thread.entries)
2196                };
2197                thread.rewind(message.id.clone().unwrap(), cx)
2198            })
2199            .await
2200            .unwrap();
2201        thread.read_with(cx, |thread, cx| {
2202            assert_eq!(
2203                thread.to_markdown(cx),
2204                indoc! {"
2205                    ## User (checkpoint)
2206
2207                    Lorem
2208
2209                    ## Assistant
2210
2211                    LOREM
2212
2213                "}
2214            );
2215        });
2216        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2217    }
2218
2219    async fn run_until_first_tool_call(
2220        thread: &Entity<AcpThread>,
2221        cx: &mut TestAppContext,
2222    ) -> usize {
2223        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2224
2225        let subscription = cx.update(|cx| {
2226            cx.subscribe(thread, move |thread, _, cx| {
2227                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2228                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2229                        return tx.try_send(ix).unwrap();
2230                    }
2231                }
2232            })
2233        });
2234
2235        select! {
2236            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2237                panic!("Timeout waiting for tool call")
2238            }
2239            ix = rx.next().fuse() => {
2240                drop(subscription);
2241                ix.unwrap()
2242            }
2243        }
2244    }
2245
2246    #[derive(Clone, Default)]
2247    struct FakeAgentConnection {
2248        auth_methods: Vec<acp::AuthMethod>,
2249        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2250        on_user_message: Option<
2251            Rc<
2252                dyn Fn(
2253                        acp::PromptRequest,
2254                        WeakEntity<AcpThread>,
2255                        AsyncApp,
2256                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2257                    + 'static,
2258            >,
2259        >,
2260    }
2261
2262    impl FakeAgentConnection {
2263        fn new() -> Self {
2264            Self {
2265                auth_methods: Vec::new(),
2266                on_user_message: None,
2267                sessions: Arc::default(),
2268            }
2269        }
2270
2271        #[expect(unused)]
2272        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2273            self.auth_methods = auth_methods;
2274            self
2275        }
2276
2277        fn on_user_message(
2278            mut self,
2279            handler: impl Fn(
2280                acp::PromptRequest,
2281                WeakEntity<AcpThread>,
2282                AsyncApp,
2283            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2284            + 'static,
2285        ) -> Self {
2286            self.on_user_message.replace(Rc::new(handler));
2287            self
2288        }
2289    }
2290
2291    impl AgentConnection for FakeAgentConnection {
2292        fn auth_methods(&self) -> &[acp::AuthMethod] {
2293            &self.auth_methods
2294        }
2295
2296        fn new_thread(
2297            self: Rc<Self>,
2298            project: Entity<Project>,
2299            _cwd: &Path,
2300            cx: &mut App,
2301        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2302            let session_id = acp::SessionId(
2303                rand::thread_rng()
2304                    .sample_iter(&rand::distributions::Alphanumeric)
2305                    .take(7)
2306                    .map(char::from)
2307                    .collect::<String>()
2308                    .into(),
2309            );
2310            let thread =
2311                cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2312            self.sessions.lock().insert(session_id, thread.downgrade());
2313            Task::ready(Ok(thread))
2314        }
2315
2316        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2317            if self.auth_methods().iter().any(|m| m.id == method) {
2318                Task::ready(Ok(()))
2319            } else {
2320                Task::ready(Err(anyhow!("Invalid Auth Method")))
2321            }
2322        }
2323
2324        fn prompt(
2325            &self,
2326            _id: Option<UserMessageId>,
2327            params: acp::PromptRequest,
2328            cx: &mut App,
2329        ) -> Task<gpui::Result<acp::PromptResponse>> {
2330            let sessions = self.sessions.lock();
2331            let thread = sessions.get(&params.session_id).unwrap();
2332            if let Some(handler) = &self.on_user_message {
2333                let handler = handler.clone();
2334                let thread = thread.clone();
2335                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2336            } else {
2337                Task::ready(Ok(acp::PromptResponse {
2338                    stop_reason: acp::StopReason::EndTurn,
2339                }))
2340            }
2341        }
2342
2343        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2344            let sessions = self.sessions.lock();
2345            let thread = sessions.get(&session_id).unwrap().clone();
2346
2347            cx.spawn(async move |cx| {
2348                thread
2349                    .update(cx, |thread, cx| thread.cancel(cx))
2350                    .unwrap()
2351                    .await
2352            })
2353            .detach();
2354        }
2355
2356        fn session_editor(
2357            &self,
2358            session_id: &acp::SessionId,
2359            _cx: &mut App,
2360        ) -> Option<Rc<dyn AgentSessionEditor>> {
2361            Some(Rc::new(FakeAgentSessionEditor {
2362                _session_id: session_id.clone(),
2363            }))
2364        }
2365
2366        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2367            self
2368        }
2369    }
2370
2371    struct FakeAgentSessionEditor {
2372        _session_id: acp::SessionId,
2373    }
2374
2375    impl AgentSessionEditor for FakeAgentSessionEditor {
2376        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2377            Task::ready(Ok(()))
2378        }
2379    }
2380}