acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6pub use connection::*;
   7pub use diff::*;
   8pub use mention::*;
   9pub use terminal::*;
  10
  11use action_log::ActionLog;
  12use agent_client_protocol as acp;
  13use anyhow::{Context as _, Result, anyhow};
  14use editor::Bias;
  15use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  16use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  17use itertools::Itertools;
  18use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  19use markdown::Markdown;
  20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  21use std::collections::HashMap;
  22use std::error::Error;
  23use std::fmt::{Formatter, Write};
  24use std::ops::Range;
  25use std::process::ExitStatus;
  26use std::rc::Rc;
  27use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  28use ui::App;
  29use util::ResultExt;
  30
  31#[derive(Debug)]
  32pub struct UserMessage {
  33    pub id: Option<UserMessageId>,
  34    pub content: ContentBlock,
  35    pub chunks: Vec<acp::ContentBlock>,
  36    pub checkpoint: Option<Checkpoint>,
  37}
  38
  39#[derive(Debug)]
  40pub struct Checkpoint {
  41    git_checkpoint: GitStoreCheckpoint,
  42    pub show: bool,
  43}
  44
  45impl UserMessage {
  46    fn to_markdown(&self, cx: &App) -> String {
  47        let mut markdown = String::new();
  48        if self
  49            .checkpoint
  50            .as_ref()
  51            .map_or(false, |checkpoint| checkpoint.show)
  52        {
  53            writeln!(markdown, "## User (checkpoint)").unwrap();
  54        } else {
  55            writeln!(markdown, "## User").unwrap();
  56        }
  57        writeln!(markdown).unwrap();
  58        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  59        writeln!(markdown).unwrap();
  60        markdown
  61    }
  62}
  63
  64#[derive(Debug, PartialEq)]
  65pub struct AssistantMessage {
  66    pub chunks: Vec<AssistantMessageChunk>,
  67}
  68
  69impl AssistantMessage {
  70    pub fn to_markdown(&self, cx: &App) -> String {
  71        format!(
  72            "## Assistant\n\n{}\n\n",
  73            self.chunks
  74                .iter()
  75                .map(|chunk| chunk.to_markdown(cx))
  76                .join("\n\n")
  77        )
  78    }
  79}
  80
  81#[derive(Debug, PartialEq)]
  82pub enum AssistantMessageChunk {
  83    Message { block: ContentBlock },
  84    Thought { block: ContentBlock },
  85}
  86
  87impl AssistantMessageChunk {
  88    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  89        Self::Message {
  90            block: ContentBlock::new(chunk.into(), language_registry, cx),
  91        }
  92    }
  93
  94    fn to_markdown(&self, cx: &App) -> String {
  95        match self {
  96            Self::Message { block } => block.to_markdown(cx).to_string(),
  97            Self::Thought { block } => {
  98                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
  99            }
 100        }
 101    }
 102}
 103
 104#[derive(Debug)]
 105pub enum AgentThreadEntry {
 106    UserMessage(UserMessage),
 107    AssistantMessage(AssistantMessage),
 108    ToolCall(ToolCall),
 109}
 110
 111impl AgentThreadEntry {
 112    pub fn to_markdown(&self, cx: &App) -> String {
 113        match self {
 114            Self::UserMessage(message) => message.to_markdown(cx),
 115            Self::AssistantMessage(message) => message.to_markdown(cx),
 116            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 117        }
 118    }
 119
 120    pub fn user_message(&self) -> Option<&UserMessage> {
 121        if let AgentThreadEntry::UserMessage(message) = self {
 122            Some(message)
 123        } else {
 124            None
 125        }
 126    }
 127
 128    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 129        if let AgentThreadEntry::ToolCall(call) = self {
 130            itertools::Either::Left(call.diffs())
 131        } else {
 132            itertools::Either::Right(std::iter::empty())
 133        }
 134    }
 135
 136    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 137        if let AgentThreadEntry::ToolCall(call) = self {
 138            itertools::Either::Left(call.terminals())
 139        } else {
 140            itertools::Either::Right(std::iter::empty())
 141        }
 142    }
 143
 144    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 145        if let AgentThreadEntry::ToolCall(ToolCall {
 146            locations,
 147            resolved_locations,
 148            ..
 149        }) = self
 150        {
 151            Some((
 152                locations.get(ix)?.clone(),
 153                resolved_locations.get(ix)?.clone()?,
 154            ))
 155        } else {
 156            None
 157        }
 158    }
 159}
 160
 161#[derive(Debug)]
 162pub struct ToolCall {
 163    pub id: acp::ToolCallId,
 164    pub label: Entity<Markdown>,
 165    pub kind: acp::ToolKind,
 166    pub content: Vec<ToolCallContent>,
 167    pub status: ToolCallStatus,
 168    pub locations: Vec<acp::ToolCallLocation>,
 169    pub resolved_locations: Vec<Option<AgentLocation>>,
 170    pub raw_input: Option<serde_json::Value>,
 171    pub raw_output: Option<serde_json::Value>,
 172}
 173
 174impl ToolCall {
 175    fn from_acp(
 176        tool_call: acp::ToolCall,
 177        status: ToolCallStatus,
 178        language_registry: Arc<LanguageRegistry>,
 179        cx: &mut App,
 180    ) -> Self {
 181        Self {
 182            id: tool_call.id,
 183            label: cx.new(|cx| {
 184                Markdown::new(
 185                    tool_call.title.into(),
 186                    Some(language_registry.clone()),
 187                    None,
 188                    cx,
 189                )
 190            }),
 191            kind: tool_call.kind,
 192            content: tool_call
 193                .content
 194                .into_iter()
 195                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 196                .collect(),
 197            locations: tool_call.locations,
 198            resolved_locations: Vec::default(),
 199            status,
 200            raw_input: tool_call.raw_input,
 201            raw_output: tool_call.raw_output,
 202        }
 203    }
 204
 205    fn update_fields(
 206        &mut self,
 207        fields: acp::ToolCallUpdateFields,
 208        language_registry: Arc<LanguageRegistry>,
 209        cx: &mut App,
 210    ) {
 211        let acp::ToolCallUpdateFields {
 212            kind,
 213            status,
 214            title,
 215            content,
 216            locations,
 217            raw_input,
 218            raw_output,
 219        } = fields;
 220
 221        if let Some(kind) = kind {
 222            self.kind = kind;
 223        }
 224
 225        if let Some(status) = status {
 226            self.status = ToolCallStatus::Allowed { status };
 227        }
 228
 229        if let Some(title) = title {
 230            self.label.update(cx, |label, cx| {
 231                label.replace(title, cx);
 232            });
 233        }
 234
 235        if let Some(content) = content {
 236            self.content = content
 237                .into_iter()
 238                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 239                .collect();
 240        }
 241
 242        if let Some(locations) = locations {
 243            self.locations = locations;
 244        }
 245
 246        if let Some(raw_input) = raw_input {
 247            self.raw_input = Some(raw_input);
 248        }
 249
 250        if let Some(raw_output) = raw_output {
 251            if self.content.is_empty() {
 252                if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 253                {
 254                    self.content
 255                        .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 256                            markdown,
 257                        }));
 258                }
 259            }
 260            self.raw_output = Some(raw_output);
 261        }
 262    }
 263
 264    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 265        self.content.iter().filter_map(|content| match content {
 266            ToolCallContent::Diff(diff) => Some(diff),
 267            ToolCallContent::ContentBlock(_) => None,
 268            ToolCallContent::Terminal(_) => None,
 269        })
 270    }
 271
 272    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 273        self.content.iter().filter_map(|content| match content {
 274            ToolCallContent::Terminal(terminal) => Some(terminal),
 275            ToolCallContent::ContentBlock(_) => None,
 276            ToolCallContent::Diff(_) => None,
 277        })
 278    }
 279
 280    fn to_markdown(&self, cx: &App) -> String {
 281        let mut markdown = format!(
 282            "**Tool Call: {}**\nStatus: {}\n\n",
 283            self.label.read(cx).source(),
 284            self.status
 285        );
 286        for content in &self.content {
 287            markdown.push_str(content.to_markdown(cx).as_str());
 288            markdown.push_str("\n\n");
 289        }
 290        markdown
 291    }
 292
 293    async fn resolve_location(
 294        location: acp::ToolCallLocation,
 295        project: WeakEntity<Project>,
 296        cx: &mut AsyncApp,
 297    ) -> Option<AgentLocation> {
 298        let buffer = project
 299            .update(cx, |project, cx| {
 300                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 301                    Some(project.open_buffer(path, cx))
 302                } else {
 303                    None
 304                }
 305            })
 306            .ok()??;
 307        let buffer = buffer.await.log_err()?;
 308        let position = buffer
 309            .update(cx, |buffer, _| {
 310                if let Some(row) = location.line {
 311                    let snapshot = buffer.snapshot();
 312                    let column = snapshot.indent_size_for_line(row).len;
 313                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 314                    snapshot.anchor_before(point)
 315                } else {
 316                    Anchor::MIN
 317                }
 318            })
 319            .ok()?;
 320
 321        Some(AgentLocation {
 322            buffer: buffer.downgrade(),
 323            position,
 324        })
 325    }
 326
 327    fn resolve_locations(
 328        &self,
 329        project: Entity<Project>,
 330        cx: &mut App,
 331    ) -> Task<Vec<Option<AgentLocation>>> {
 332        let locations = self.locations.clone();
 333        project.update(cx, |_, cx| {
 334            cx.spawn(async move |project, cx| {
 335                let mut new_locations = Vec::new();
 336                for location in locations {
 337                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 338                }
 339                new_locations
 340            })
 341        })
 342    }
 343}
 344
 345#[derive(Debug)]
 346pub enum ToolCallStatus {
 347    WaitingForConfirmation {
 348        options: Vec<acp::PermissionOption>,
 349        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 350    },
 351    Allowed {
 352        status: acp::ToolCallStatus,
 353    },
 354    Rejected,
 355    Canceled,
 356}
 357
 358impl Display for ToolCallStatus {
 359    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 360        write!(
 361            f,
 362            "{}",
 363            match self {
 364                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 365                ToolCallStatus::Allowed { status } => match status {
 366                    acp::ToolCallStatus::Pending => "Pending",
 367                    acp::ToolCallStatus::InProgress => "In Progress",
 368                    acp::ToolCallStatus::Completed => "Completed",
 369                    acp::ToolCallStatus::Failed => "Failed",
 370                },
 371                ToolCallStatus::Rejected => "Rejected",
 372                ToolCallStatus::Canceled => "Canceled",
 373            }
 374        )
 375    }
 376}
 377
 378#[derive(Debug, PartialEq, Clone)]
 379pub enum ContentBlock {
 380    Empty,
 381    Markdown { markdown: Entity<Markdown> },
 382    ResourceLink { resource_link: acp::ResourceLink },
 383}
 384
 385impl ContentBlock {
 386    pub fn new(
 387        block: acp::ContentBlock,
 388        language_registry: &Arc<LanguageRegistry>,
 389        cx: &mut App,
 390    ) -> Self {
 391        let mut this = Self::Empty;
 392        this.append(block, language_registry, cx);
 393        this
 394    }
 395
 396    pub fn new_combined(
 397        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 398        language_registry: Arc<LanguageRegistry>,
 399        cx: &mut App,
 400    ) -> Self {
 401        let mut this = Self::Empty;
 402        for block in blocks {
 403            this.append(block, &language_registry, cx);
 404        }
 405        this
 406    }
 407
 408    pub fn append(
 409        &mut self,
 410        block: acp::ContentBlock,
 411        language_registry: &Arc<LanguageRegistry>,
 412        cx: &mut App,
 413    ) {
 414        if matches!(self, ContentBlock::Empty) {
 415            if let acp::ContentBlock::ResourceLink(resource_link) = block {
 416                *self = ContentBlock::ResourceLink { resource_link };
 417                return;
 418            }
 419        }
 420
 421        let new_content = self.block_string_contents(block);
 422
 423        match self {
 424            ContentBlock::Empty => {
 425                *self = Self::create_markdown_block(new_content, language_registry, cx);
 426            }
 427            ContentBlock::Markdown { markdown } => {
 428                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 429            }
 430            ContentBlock::ResourceLink { resource_link } => {
 431                let existing_content = Self::resource_link_md(&resource_link.uri);
 432                let combined = format!("{}\n{}", existing_content, new_content);
 433
 434                *self = Self::create_markdown_block(combined, language_registry, cx);
 435            }
 436        }
 437    }
 438
 439    fn create_markdown_block(
 440        content: String,
 441        language_registry: &Arc<LanguageRegistry>,
 442        cx: &mut App,
 443    ) -> ContentBlock {
 444        ContentBlock::Markdown {
 445            markdown: cx
 446                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 447        }
 448    }
 449
 450    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 451        match block {
 452            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 453            acp::ContentBlock::ResourceLink(resource_link) => {
 454                Self::resource_link_md(&resource_link.uri)
 455            }
 456            acp::ContentBlock::Resource(acp::EmbeddedResource {
 457                resource:
 458                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 459                        uri,
 460                        ..
 461                    }),
 462                ..
 463            }) => Self::resource_link_md(&uri),
 464            acp::ContentBlock::Image(image) => Self::image_md(&image),
 465            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 466        }
 467    }
 468
 469    fn resource_link_md(uri: &str) -> String {
 470        if let Some(uri) = MentionUri::parse(&uri).log_err() {
 471            uri.as_link().to_string()
 472        } else {
 473            uri.to_string()
 474        }
 475    }
 476
 477    fn image_md(_image: &acp::ImageContent) -> String {
 478        "`Image`".into()
 479    }
 480
 481    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 482        match self {
 483            ContentBlock::Empty => "",
 484            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 485            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 486        }
 487    }
 488
 489    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 490        match self {
 491            ContentBlock::Empty => None,
 492            ContentBlock::Markdown { markdown } => Some(markdown),
 493            ContentBlock::ResourceLink { .. } => None,
 494        }
 495    }
 496
 497    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 498        match self {
 499            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 500            _ => None,
 501        }
 502    }
 503}
 504
 505#[derive(Debug)]
 506pub enum ToolCallContent {
 507    ContentBlock(ContentBlock),
 508    Diff(Entity<Diff>),
 509    Terminal(Entity<Terminal>),
 510}
 511
 512impl ToolCallContent {
 513    pub fn from_acp(
 514        content: acp::ToolCallContent,
 515        language_registry: Arc<LanguageRegistry>,
 516        cx: &mut App,
 517    ) -> Self {
 518        match content {
 519            acp::ToolCallContent::Content { content } => {
 520                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 521            }
 522            acp::ToolCallContent::Diff { diff } => {
 523                Self::Diff(cx.new(|cx| Diff::from_acp(diff, language_registry, cx)))
 524            }
 525        }
 526    }
 527
 528    pub fn to_markdown(&self, cx: &App) -> String {
 529        match self {
 530            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 531            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 532            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 533        }
 534    }
 535}
 536
 537#[derive(Debug, PartialEq)]
 538pub enum ToolCallUpdate {
 539    UpdateFields(acp::ToolCallUpdate),
 540    UpdateDiff(ToolCallUpdateDiff),
 541    UpdateTerminal(ToolCallUpdateTerminal),
 542}
 543
 544impl ToolCallUpdate {
 545    fn id(&self) -> &acp::ToolCallId {
 546        match self {
 547            Self::UpdateFields(update) => &update.id,
 548            Self::UpdateDiff(diff) => &diff.id,
 549            Self::UpdateTerminal(terminal) => &terminal.id,
 550        }
 551    }
 552}
 553
 554impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 555    fn from(update: acp::ToolCallUpdate) -> Self {
 556        Self::UpdateFields(update)
 557    }
 558}
 559
 560impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 561    fn from(diff: ToolCallUpdateDiff) -> Self {
 562        Self::UpdateDiff(diff)
 563    }
 564}
 565
 566#[derive(Debug, PartialEq)]
 567pub struct ToolCallUpdateDiff {
 568    pub id: acp::ToolCallId,
 569    pub diff: Entity<Diff>,
 570}
 571
 572impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 573    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 574        Self::UpdateTerminal(terminal)
 575    }
 576}
 577
 578#[derive(Debug, PartialEq)]
 579pub struct ToolCallUpdateTerminal {
 580    pub id: acp::ToolCallId,
 581    pub terminal: Entity<Terminal>,
 582}
 583
 584#[derive(Debug, Default)]
 585pub struct Plan {
 586    pub entries: Vec<PlanEntry>,
 587}
 588
 589#[derive(Debug)]
 590pub struct PlanStats<'a> {
 591    pub in_progress_entry: Option<&'a PlanEntry>,
 592    pub pending: u32,
 593    pub completed: u32,
 594}
 595
 596impl Plan {
 597    pub fn is_empty(&self) -> bool {
 598        self.entries.is_empty()
 599    }
 600
 601    pub fn stats(&self) -> PlanStats<'_> {
 602        let mut stats = PlanStats {
 603            in_progress_entry: None,
 604            pending: 0,
 605            completed: 0,
 606        };
 607
 608        for entry in &self.entries {
 609            match &entry.status {
 610                acp::PlanEntryStatus::Pending => {
 611                    stats.pending += 1;
 612                }
 613                acp::PlanEntryStatus::InProgress => {
 614                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 615                }
 616                acp::PlanEntryStatus::Completed => {
 617                    stats.completed += 1;
 618                }
 619            }
 620        }
 621
 622        stats
 623    }
 624}
 625
 626#[derive(Debug)]
 627pub struct PlanEntry {
 628    pub content: Entity<Markdown>,
 629    pub priority: acp::PlanEntryPriority,
 630    pub status: acp::PlanEntryStatus,
 631}
 632
 633impl PlanEntry {
 634    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 635        Self {
 636            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 637            priority: entry.priority,
 638            status: entry.status,
 639        }
 640    }
 641}
 642
 643pub struct AcpThread {
 644    title: SharedString,
 645    entries: Vec<AgentThreadEntry>,
 646    plan: Plan,
 647    project: Entity<Project>,
 648    action_log: Entity<ActionLog>,
 649    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 650    send_task: Option<Task<()>>,
 651    connection: Rc<dyn AgentConnection>,
 652    session_id: acp::SessionId,
 653}
 654
 655pub enum AcpThreadEvent {
 656    NewEntry,
 657    EntryUpdated(usize),
 658    EntriesRemoved(Range<usize>),
 659    ToolAuthorizationRequired,
 660    Stopped,
 661    Error,
 662    ServerExited(ExitStatus),
 663}
 664
 665impl EventEmitter<AcpThreadEvent> for AcpThread {}
 666
 667#[derive(PartialEq, Eq)]
 668pub enum ThreadStatus {
 669    Idle,
 670    WaitingForToolConfirmation,
 671    Generating,
 672}
 673
 674#[derive(Debug, Clone)]
 675pub enum LoadError {
 676    Unsupported {
 677        error_message: SharedString,
 678        upgrade_message: SharedString,
 679        upgrade_command: String,
 680    },
 681    Exited(i32),
 682    Other(SharedString),
 683}
 684
 685impl Display for LoadError {
 686    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 687        match self {
 688            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 689            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 690            LoadError::Other(msg) => write!(f, "{}", msg),
 691        }
 692    }
 693}
 694
 695impl Error for LoadError {}
 696
 697impl AcpThread {
 698    pub fn new(
 699        title: impl Into<SharedString>,
 700        connection: Rc<dyn AgentConnection>,
 701        project: Entity<Project>,
 702        session_id: acp::SessionId,
 703        cx: &mut Context<Self>,
 704    ) -> Self {
 705        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 706
 707        Self {
 708            action_log,
 709            shared_buffers: Default::default(),
 710            entries: Default::default(),
 711            plan: Default::default(),
 712            title: title.into(),
 713            project,
 714            send_task: None,
 715            connection,
 716            session_id,
 717        }
 718    }
 719
 720    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 721        &self.connection
 722    }
 723
 724    pub fn action_log(&self) -> &Entity<ActionLog> {
 725        &self.action_log
 726    }
 727
 728    pub fn project(&self) -> &Entity<Project> {
 729        &self.project
 730    }
 731
 732    pub fn title(&self) -> SharedString {
 733        self.title.clone()
 734    }
 735
 736    pub fn entries(&self) -> &[AgentThreadEntry] {
 737        &self.entries
 738    }
 739
 740    pub fn session_id(&self) -> &acp::SessionId {
 741        &self.session_id
 742    }
 743
 744    pub fn status(&self) -> ThreadStatus {
 745        if self.send_task.is_some() {
 746            if self.waiting_for_tool_confirmation() {
 747                ThreadStatus::WaitingForToolConfirmation
 748            } else {
 749                ThreadStatus::Generating
 750            }
 751        } else {
 752            ThreadStatus::Idle
 753        }
 754    }
 755
 756    pub fn has_pending_edit_tool_calls(&self) -> bool {
 757        for entry in self.entries.iter().rev() {
 758            match entry {
 759                AgentThreadEntry::UserMessage(_) => return false,
 760                AgentThreadEntry::ToolCall(
 761                    call @ ToolCall {
 762                        status:
 763                            ToolCallStatus::Allowed {
 764                                status:
 765                                    acp::ToolCallStatus::InProgress | acp::ToolCallStatus::Pending,
 766                            },
 767                        ..
 768                    },
 769                ) if call.diffs().next().is_some() => {
 770                    return true;
 771                }
 772                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 773            }
 774        }
 775
 776        false
 777    }
 778
 779    pub fn used_tools_since_last_user_message(&self) -> bool {
 780        for entry in self.entries.iter().rev() {
 781            match entry {
 782                AgentThreadEntry::UserMessage(..) => return false,
 783                AgentThreadEntry::AssistantMessage(..) => continue,
 784                AgentThreadEntry::ToolCall(..) => return true,
 785            }
 786        }
 787
 788        false
 789    }
 790
 791    pub fn handle_session_update(
 792        &mut self,
 793        update: acp::SessionUpdate,
 794        cx: &mut Context<Self>,
 795    ) -> Result<()> {
 796        match update {
 797            acp::SessionUpdate::UserMessageChunk { content } => {
 798                self.push_user_content_block(None, content, cx);
 799            }
 800            acp::SessionUpdate::AgentMessageChunk { content } => {
 801                self.push_assistant_content_block(content, false, cx);
 802            }
 803            acp::SessionUpdate::AgentThoughtChunk { content } => {
 804                self.push_assistant_content_block(content, true, cx);
 805            }
 806            acp::SessionUpdate::ToolCall(tool_call) => {
 807                self.upsert_tool_call(tool_call, cx);
 808            }
 809            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 810                self.update_tool_call(tool_call_update, cx)?;
 811            }
 812            acp::SessionUpdate::Plan(plan) => {
 813                self.update_plan(plan, cx);
 814            }
 815        }
 816        Ok(())
 817    }
 818
 819    pub fn push_user_content_block(
 820        &mut self,
 821        message_id: Option<UserMessageId>,
 822        chunk: acp::ContentBlock,
 823        cx: &mut Context<Self>,
 824    ) {
 825        let language_registry = self.project.read(cx).languages().clone();
 826        let entries_len = self.entries.len();
 827
 828        if let Some(last_entry) = self.entries.last_mut()
 829            && let AgentThreadEntry::UserMessage(UserMessage {
 830                id,
 831                content,
 832                chunks,
 833                ..
 834            }) = last_entry
 835        {
 836            *id = message_id.or(id.take());
 837            content.append(chunk.clone(), &language_registry, cx);
 838            chunks.push(chunk);
 839            let idx = entries_len - 1;
 840            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 841        } else {
 842            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 843            self.push_entry(
 844                AgentThreadEntry::UserMessage(UserMessage {
 845                    id: message_id,
 846                    content,
 847                    chunks: vec![chunk],
 848                    checkpoint: None,
 849                }),
 850                cx,
 851            );
 852        }
 853    }
 854
 855    pub fn push_assistant_content_block(
 856        &mut self,
 857        chunk: acp::ContentBlock,
 858        is_thought: bool,
 859        cx: &mut Context<Self>,
 860    ) {
 861        let language_registry = self.project.read(cx).languages().clone();
 862        let entries_len = self.entries.len();
 863        if let Some(last_entry) = self.entries.last_mut()
 864            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 865        {
 866            let idx = entries_len - 1;
 867            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 868            match (chunks.last_mut(), is_thought) {
 869                (Some(AssistantMessageChunk::Message { block }), false)
 870                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 871                    block.append(chunk, &language_registry, cx)
 872                }
 873                _ => {
 874                    let block = ContentBlock::new(chunk, &language_registry, cx);
 875                    if is_thought {
 876                        chunks.push(AssistantMessageChunk::Thought { block })
 877                    } else {
 878                        chunks.push(AssistantMessageChunk::Message { block })
 879                    }
 880                }
 881            }
 882        } else {
 883            let block = ContentBlock::new(chunk, &language_registry, cx);
 884            let chunk = if is_thought {
 885                AssistantMessageChunk::Thought { block }
 886            } else {
 887                AssistantMessageChunk::Message { block }
 888            };
 889
 890            self.push_entry(
 891                AgentThreadEntry::AssistantMessage(AssistantMessage {
 892                    chunks: vec![chunk],
 893                }),
 894                cx,
 895            );
 896        }
 897    }
 898
 899    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 900        self.entries.push(entry);
 901        cx.emit(AcpThreadEvent::NewEntry);
 902    }
 903
 904    pub fn update_tool_call(
 905        &mut self,
 906        update: impl Into<ToolCallUpdate>,
 907        cx: &mut Context<Self>,
 908    ) -> Result<()> {
 909        let update = update.into();
 910        let languages = self.project.read(cx).languages().clone();
 911
 912        let (ix, current_call) = self
 913            .tool_call_mut(update.id())
 914            .context("Tool call not found")?;
 915        match update {
 916            ToolCallUpdate::UpdateFields(update) => {
 917                let location_updated = update.fields.locations.is_some();
 918                current_call.update_fields(update.fields, languages, cx);
 919                if location_updated {
 920                    self.resolve_locations(update.id.clone(), cx);
 921                }
 922            }
 923            ToolCallUpdate::UpdateDiff(update) => {
 924                current_call.content.clear();
 925                current_call
 926                    .content
 927                    .push(ToolCallContent::Diff(update.diff));
 928            }
 929            ToolCallUpdate::UpdateTerminal(update) => {
 930                current_call.content.clear();
 931                current_call
 932                    .content
 933                    .push(ToolCallContent::Terminal(update.terminal));
 934            }
 935        }
 936
 937        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 938
 939        Ok(())
 940    }
 941
 942    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
 943    pub fn upsert_tool_call(&mut self, tool_call: acp::ToolCall, cx: &mut Context<Self>) {
 944        let status = ToolCallStatus::Allowed {
 945            status: tool_call.status,
 946        };
 947        self.upsert_tool_call_inner(tool_call, status, cx)
 948    }
 949
 950    pub fn upsert_tool_call_inner(
 951        &mut self,
 952        tool_call: acp::ToolCall,
 953        status: ToolCallStatus,
 954        cx: &mut Context<Self>,
 955    ) {
 956        let language_registry = self.project.read(cx).languages().clone();
 957        let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
 958        let id = call.id.clone();
 959
 960        if let Some((ix, current_call)) = self.tool_call_mut(&call.id) {
 961            *current_call = call;
 962
 963            cx.emit(AcpThreadEvent::EntryUpdated(ix));
 964        } else {
 965            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
 966        };
 967
 968        self.resolve_locations(id, cx);
 969    }
 970
 971    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
 972        // The tool call we are looking for is typically the last one, or very close to the end.
 973        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
 974        self.entries
 975            .iter_mut()
 976            .enumerate()
 977            .rev()
 978            .find_map(|(index, tool_call)| {
 979                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
 980                    && &tool_call.id == id
 981                {
 982                    Some((index, tool_call))
 983                } else {
 984                    None
 985                }
 986            })
 987    }
 988
 989    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
 990        let project = self.project.clone();
 991        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
 992            return;
 993        };
 994        let task = tool_call.resolve_locations(project, cx);
 995        cx.spawn(async move |this, cx| {
 996            let resolved_locations = task.await;
 997            this.update(cx, |this, cx| {
 998                let project = this.project.clone();
 999                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1000                    return;
1001                };
1002                if let Some(Some(location)) = resolved_locations.last() {
1003                    project.update(cx, |project, cx| {
1004                        if let Some(agent_location) = project.agent_location() {
1005                            let should_ignore = agent_location.buffer == location.buffer
1006                                && location
1007                                    .buffer
1008                                    .update(cx, |buffer, _| {
1009                                        let snapshot = buffer.snapshot();
1010                                        let old_position =
1011                                            agent_location.position.to_point(&snapshot);
1012                                        let new_position = location.position.to_point(&snapshot);
1013                                        // ignore this so that when we get updates from the edit tool
1014                                        // the position doesn't reset to the startof line
1015                                        old_position.row == new_position.row
1016                                            && old_position.column > new_position.column
1017                                    })
1018                                    .ok()
1019                                    .unwrap_or_default();
1020                            if !should_ignore {
1021                                project.set_agent_location(Some(location.clone()), cx);
1022                            }
1023                        }
1024                    });
1025                }
1026                if tool_call.resolved_locations != resolved_locations {
1027                    tool_call.resolved_locations = resolved_locations;
1028                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1029                }
1030            })
1031        })
1032        .detach();
1033    }
1034
1035    pub fn request_tool_call_authorization(
1036        &mut self,
1037        tool_call: acp::ToolCall,
1038        options: Vec<acp::PermissionOption>,
1039        cx: &mut Context<Self>,
1040    ) -> oneshot::Receiver<acp::PermissionOptionId> {
1041        let (tx, rx) = oneshot::channel();
1042
1043        let status = ToolCallStatus::WaitingForConfirmation {
1044            options,
1045            respond_tx: tx,
1046        };
1047
1048        self.upsert_tool_call_inner(tool_call, status, cx);
1049        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1050        rx
1051    }
1052
1053    pub fn authorize_tool_call(
1054        &mut self,
1055        id: acp::ToolCallId,
1056        option_id: acp::PermissionOptionId,
1057        option_kind: acp::PermissionOptionKind,
1058        cx: &mut Context<Self>,
1059    ) {
1060        let Some((ix, call)) = self.tool_call_mut(&id) else {
1061            return;
1062        };
1063
1064        let new_status = match option_kind {
1065            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1066                ToolCallStatus::Rejected
1067            }
1068            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1069                ToolCallStatus::Allowed {
1070                    status: acp::ToolCallStatus::InProgress,
1071                }
1072            }
1073        };
1074
1075        let curr_status = mem::replace(&mut call.status, new_status);
1076
1077        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1078            respond_tx.send(option_id).log_err();
1079        } else if cfg!(debug_assertions) {
1080            panic!("tried to authorize an already authorized tool call");
1081        }
1082
1083        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1084    }
1085
1086    /// Returns true if the last turn is awaiting tool authorization
1087    pub fn waiting_for_tool_confirmation(&self) -> bool {
1088        for entry in self.entries.iter().rev() {
1089            match &entry {
1090                AgentThreadEntry::ToolCall(call) => match call.status {
1091                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1092                    ToolCallStatus::Allowed { .. }
1093                    | ToolCallStatus::Rejected
1094                    | ToolCallStatus::Canceled => continue,
1095                },
1096                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1097                    // Reached the beginning of the turn
1098                    return false;
1099                }
1100            }
1101        }
1102        false
1103    }
1104
1105    pub fn plan(&self) -> &Plan {
1106        &self.plan
1107    }
1108
1109    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1110        let new_entries_len = request.entries.len();
1111        let mut new_entries = request.entries.into_iter();
1112
1113        // Reuse existing markdown to prevent flickering
1114        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1115            let PlanEntry {
1116                content,
1117                priority,
1118                status,
1119            } = old;
1120            content.update(cx, |old, cx| {
1121                old.replace(new.content, cx);
1122            });
1123            *priority = new.priority;
1124            *status = new.status;
1125        }
1126        for new in new_entries {
1127            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1128        }
1129        self.plan.entries.truncate(new_entries_len);
1130
1131        cx.notify();
1132    }
1133
1134    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1135        self.plan
1136            .entries
1137            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1138        cx.notify();
1139    }
1140
1141    #[cfg(any(test, feature = "test-support"))]
1142    pub fn send_raw(
1143        &mut self,
1144        message: &str,
1145        cx: &mut Context<Self>,
1146    ) -> BoxFuture<'static, Result<()>> {
1147        self.send(
1148            vec![acp::ContentBlock::Text(acp::TextContent {
1149                text: message.to_string(),
1150                annotations: None,
1151            })],
1152            cx,
1153        )
1154    }
1155
1156    pub fn send(
1157        &mut self,
1158        message: Vec<acp::ContentBlock>,
1159        cx: &mut Context<Self>,
1160    ) -> BoxFuture<'static, Result<()>> {
1161        let block = ContentBlock::new_combined(
1162            message.clone(),
1163            self.project.read(cx).languages().clone(),
1164            cx,
1165        );
1166        let request = acp::PromptRequest {
1167            prompt: message.clone(),
1168            session_id: self.session_id.clone(),
1169        };
1170        let git_store = self.project.read(cx).git_store().clone();
1171
1172        let message_id = if self
1173            .connection
1174            .session_editor(&self.session_id, cx)
1175            .is_some()
1176        {
1177            Some(UserMessageId::new())
1178        } else {
1179            None
1180        };
1181        self.push_entry(
1182            AgentThreadEntry::UserMessage(UserMessage {
1183                id: message_id.clone(),
1184                content: block,
1185                chunks: message,
1186                checkpoint: None,
1187            }),
1188            cx,
1189        );
1190
1191        self.run_turn(cx, async move |this, cx| {
1192            let old_checkpoint = git_store
1193                .update(cx, |git, cx| git.checkpoint(cx))?
1194                .await
1195                .context("failed to get old checkpoint")
1196                .log_err();
1197            this.update(cx, |this, cx| {
1198                if let Some((_ix, message)) = this.last_user_message() {
1199                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1200                        git_checkpoint,
1201                        show: false,
1202                    });
1203                }
1204                this.connection.prompt(message_id, request, cx)
1205            })?
1206            .await
1207        })
1208    }
1209
1210    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1211        self.run_turn(cx, async move |this, cx| {
1212            this.update(cx, |this, cx| {
1213                this.connection
1214                    .resume(&this.session_id, cx)
1215                    .map(|resume| resume.run(cx))
1216            })?
1217            .context("resuming a session is not supported")?
1218            .await
1219        })
1220    }
1221
1222    fn run_turn(
1223        &mut self,
1224        cx: &mut Context<Self>,
1225        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1226    ) -> BoxFuture<'static, Result<()>> {
1227        self.clear_completed_plan_entries(cx);
1228
1229        let (tx, rx) = oneshot::channel();
1230        let cancel_task = self.cancel(cx);
1231
1232        self.send_task = Some(cx.spawn(async move |this, cx| {
1233            cancel_task.await;
1234            tx.send(f(this, cx).await).ok();
1235        }));
1236
1237        cx.spawn(async move |this, cx| {
1238            let response = rx.await;
1239
1240            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1241                .await?;
1242
1243            this.update(cx, |this, cx| {
1244                match response {
1245                    Ok(Err(e)) => {
1246                        this.send_task.take();
1247                        cx.emit(AcpThreadEvent::Error);
1248                        Err(e)
1249                    }
1250                    result => {
1251                        let cancelled = matches!(
1252                            result,
1253                            Ok(Ok(acp::PromptResponse {
1254                                stop_reason: acp::StopReason::Cancelled
1255                            }))
1256                        );
1257
1258                        // We only take the task if the current prompt wasn't cancelled.
1259                        //
1260                        // This prompt may have been cancelled because another one was sent
1261                        // while it was still generating. In these cases, dropping `send_task`
1262                        // would cause the next generation to be cancelled.
1263                        if !cancelled {
1264                            this.send_task.take();
1265                        }
1266
1267                        cx.emit(AcpThreadEvent::Stopped);
1268                        Ok(())
1269                    }
1270                }
1271            })?
1272        })
1273        .boxed()
1274    }
1275
1276    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1277        let Some(send_task) = self.send_task.take() else {
1278            return Task::ready(());
1279        };
1280
1281        for entry in self.entries.iter_mut() {
1282            if let AgentThreadEntry::ToolCall(call) = entry {
1283                let cancel = matches!(
1284                    call.status,
1285                    ToolCallStatus::WaitingForConfirmation { .. }
1286                        | ToolCallStatus::Allowed {
1287                            status: acp::ToolCallStatus::InProgress
1288                        }
1289                );
1290
1291                if cancel {
1292                    call.status = ToolCallStatus::Canceled;
1293                }
1294            }
1295        }
1296
1297        self.connection.cancel(&self.session_id, cx);
1298
1299        // Wait for the send task to complete
1300        cx.foreground_executor().spawn(send_task)
1301    }
1302
1303    /// Rewinds this thread to before the entry at `index`, removing it and all
1304    /// subsequent entries while reverting any changes made from that point.
1305    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1306        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1307            return Task::ready(Err(anyhow!("not supported")));
1308        };
1309        let Some(message) = self.user_message(&id) else {
1310            return Task::ready(Err(anyhow!("message not found")));
1311        };
1312
1313        let checkpoint = message
1314            .checkpoint
1315            .as_ref()
1316            .map(|c| c.git_checkpoint.clone());
1317
1318        let git_store = self.project.read(cx).git_store().clone();
1319        cx.spawn(async move |this, cx| {
1320            if let Some(checkpoint) = checkpoint {
1321                git_store
1322                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1323                    .await?;
1324            }
1325
1326            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1327                .await?;
1328            this.update(cx, |this, cx| {
1329                if let Some((ix, _)) = this.user_message_mut(&id) {
1330                    let range = ix..this.entries.len();
1331                    this.entries.truncate(ix);
1332                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1333                }
1334            })
1335        })
1336    }
1337
1338    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1339        let git_store = self.project.read(cx).git_store().clone();
1340
1341        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1342            if let Some(checkpoint) = message.checkpoint.as_ref() {
1343                checkpoint.git_checkpoint.clone()
1344            } else {
1345                return Task::ready(Ok(()));
1346            }
1347        } else {
1348            return Task::ready(Ok(()));
1349        };
1350
1351        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1352        cx.spawn(async move |this, cx| {
1353            let new_checkpoint = new_checkpoint
1354                .await
1355                .context("failed to get new checkpoint")
1356                .log_err();
1357            if let Some(new_checkpoint) = new_checkpoint {
1358                let equal = git_store
1359                    .update(cx, |git, cx| {
1360                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1361                    })?
1362                    .await
1363                    .unwrap_or(true);
1364                this.update(cx, |this, cx| {
1365                    let (ix, message) = this.last_user_message().context("no user message")?;
1366                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1367                    checkpoint.show = !equal;
1368                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1369                    anyhow::Ok(())
1370                })??;
1371            }
1372
1373            Ok(())
1374        })
1375    }
1376
1377    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1378        self.entries
1379            .iter_mut()
1380            .enumerate()
1381            .rev()
1382            .find_map(|(ix, entry)| {
1383                if let AgentThreadEntry::UserMessage(message) = entry {
1384                    Some((ix, message))
1385                } else {
1386                    None
1387                }
1388            })
1389    }
1390
1391    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1392        self.entries.iter().find_map(|entry| {
1393            if let AgentThreadEntry::UserMessage(message) = entry {
1394                if message.id.as_ref() == Some(&id) {
1395                    Some(message)
1396                } else {
1397                    None
1398                }
1399            } else {
1400                None
1401            }
1402        })
1403    }
1404
1405    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1406        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1407            if let AgentThreadEntry::UserMessage(message) = entry {
1408                if message.id.as_ref() == Some(&id) {
1409                    Some((ix, message))
1410                } else {
1411                    None
1412                }
1413            } else {
1414                None
1415            }
1416        })
1417    }
1418
1419    pub fn read_text_file(
1420        &self,
1421        path: PathBuf,
1422        line: Option<u32>,
1423        limit: Option<u32>,
1424        reuse_shared_snapshot: bool,
1425        cx: &mut Context<Self>,
1426    ) -> Task<Result<String>> {
1427        let project = self.project.clone();
1428        let action_log = self.action_log.clone();
1429        cx.spawn(async move |this, cx| {
1430            let load = project.update(cx, |project, cx| {
1431                let path = project
1432                    .project_path_for_absolute_path(&path, cx)
1433                    .context("invalid path")?;
1434                anyhow::Ok(project.open_buffer(path, cx))
1435            });
1436            let buffer = load??.await?;
1437
1438            let snapshot = if reuse_shared_snapshot {
1439                this.read_with(cx, |this, _| {
1440                    this.shared_buffers.get(&buffer.clone()).cloned()
1441                })
1442                .log_err()
1443                .flatten()
1444            } else {
1445                None
1446            };
1447
1448            let snapshot = if let Some(snapshot) = snapshot {
1449                snapshot
1450            } else {
1451                action_log.update(cx, |action_log, cx| {
1452                    action_log.buffer_read(buffer.clone(), cx);
1453                })?;
1454                project.update(cx, |project, cx| {
1455                    let position = buffer
1456                        .read(cx)
1457                        .snapshot()
1458                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1459                    project.set_agent_location(
1460                        Some(AgentLocation {
1461                            buffer: buffer.downgrade(),
1462                            position,
1463                        }),
1464                        cx,
1465                    );
1466                })?;
1467
1468                buffer.update(cx, |buffer, _| buffer.snapshot())?
1469            };
1470
1471            this.update(cx, |this, _| {
1472                let text = snapshot.text();
1473                this.shared_buffers.insert(buffer.clone(), snapshot);
1474                if line.is_none() && limit.is_none() {
1475                    return Ok(text);
1476                }
1477                let limit = limit.unwrap_or(u32::MAX) as usize;
1478                let Some(line) = line else {
1479                    return Ok(text.lines().take(limit).collect::<String>());
1480                };
1481
1482                let count = text.lines().count();
1483                if count < line as usize {
1484                    anyhow::bail!("There are only {} lines", count);
1485                }
1486                Ok(text
1487                    .lines()
1488                    .skip(line as usize + 1)
1489                    .take(limit)
1490                    .collect::<String>())
1491            })?
1492        })
1493    }
1494
1495    pub fn write_text_file(
1496        &self,
1497        path: PathBuf,
1498        content: String,
1499        cx: &mut Context<Self>,
1500    ) -> Task<Result<()>> {
1501        let project = self.project.clone();
1502        let action_log = self.action_log.clone();
1503        cx.spawn(async move |this, cx| {
1504            let load = project.update(cx, |project, cx| {
1505                let path = project
1506                    .project_path_for_absolute_path(&path, cx)
1507                    .context("invalid path")?;
1508                anyhow::Ok(project.open_buffer(path, cx))
1509            });
1510            let buffer = load??.await?;
1511            let snapshot = this.update(cx, |this, cx| {
1512                this.shared_buffers
1513                    .get(&buffer)
1514                    .cloned()
1515                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1516            })?;
1517            let edits = cx
1518                .background_executor()
1519                .spawn(async move {
1520                    let old_text = snapshot.text();
1521                    text_diff(old_text.as_str(), &content)
1522                        .into_iter()
1523                        .map(|(range, replacement)| {
1524                            (
1525                                snapshot.anchor_after(range.start)
1526                                    ..snapshot.anchor_before(range.end),
1527                                replacement,
1528                            )
1529                        })
1530                        .collect::<Vec<_>>()
1531                })
1532                .await;
1533            cx.update(|cx| {
1534                project.update(cx, |project, cx| {
1535                    project.set_agent_location(
1536                        Some(AgentLocation {
1537                            buffer: buffer.downgrade(),
1538                            position: edits
1539                                .last()
1540                                .map(|(range, _)| range.end)
1541                                .unwrap_or(Anchor::MIN),
1542                        }),
1543                        cx,
1544                    );
1545                });
1546
1547                action_log.update(cx, |action_log, cx| {
1548                    action_log.buffer_read(buffer.clone(), cx);
1549                });
1550                buffer.update(cx, |buffer, cx| {
1551                    buffer.edit(edits, None, cx);
1552                });
1553                action_log.update(cx, |action_log, cx| {
1554                    action_log.buffer_edited(buffer.clone(), cx);
1555                });
1556            })?;
1557            project
1558                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1559                .await
1560        })
1561    }
1562
1563    pub fn to_markdown(&self, cx: &App) -> String {
1564        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1565    }
1566
1567    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1568        cx.emit(AcpThreadEvent::ServerExited(status));
1569    }
1570}
1571
1572fn markdown_for_raw_output(
1573    raw_output: &serde_json::Value,
1574    language_registry: &Arc<LanguageRegistry>,
1575    cx: &mut App,
1576) -> Option<Entity<Markdown>> {
1577    match raw_output {
1578        serde_json::Value::Null => None,
1579        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1580            Markdown::new(
1581                value.to_string().into(),
1582                Some(language_registry.clone()),
1583                None,
1584                cx,
1585            )
1586        })),
1587        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1588            Markdown::new(
1589                value.to_string().into(),
1590                Some(language_registry.clone()),
1591                None,
1592                cx,
1593            )
1594        })),
1595        serde_json::Value::String(value) => Some(cx.new(|cx| {
1596            Markdown::new(
1597                value.clone().into(),
1598                Some(language_registry.clone()),
1599                None,
1600                cx,
1601            )
1602        })),
1603        value => Some(cx.new(|cx| {
1604            Markdown::new(
1605                format!("```json\n{}\n```", value).into(),
1606                Some(language_registry.clone()),
1607                None,
1608                cx,
1609            )
1610        })),
1611    }
1612}
1613
1614#[cfg(test)]
1615mod tests {
1616    use super::*;
1617    use anyhow::anyhow;
1618    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1619    use gpui::{AsyncApp, TestAppContext, WeakEntity};
1620    use indoc::indoc;
1621    use project::{FakeFs, Fs};
1622    use rand::Rng as _;
1623    use serde_json::json;
1624    use settings::SettingsStore;
1625    use smol::stream::StreamExt as _;
1626    use std::{
1627        any::Any,
1628        cell::RefCell,
1629        path::Path,
1630        rc::Rc,
1631        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1632        time::Duration,
1633    };
1634    use util::path;
1635
1636    fn init_test(cx: &mut TestAppContext) {
1637        env_logger::try_init().ok();
1638        cx.update(|cx| {
1639            let settings_store = SettingsStore::test(cx);
1640            cx.set_global(settings_store);
1641            Project::init_settings(cx);
1642            language::init(cx);
1643        });
1644    }
1645
1646    #[gpui::test]
1647    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1648        init_test(cx);
1649
1650        let fs = FakeFs::new(cx.executor());
1651        let project = Project::test(fs, [], cx).await;
1652        let connection = Rc::new(FakeAgentConnection::new());
1653        let thread = cx
1654            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1655            .await
1656            .unwrap();
1657
1658        // Test creating a new user message
1659        thread.update(cx, |thread, cx| {
1660            thread.push_user_content_block(
1661                None,
1662                acp::ContentBlock::Text(acp::TextContent {
1663                    annotations: None,
1664                    text: "Hello, ".to_string(),
1665                }),
1666                cx,
1667            );
1668        });
1669
1670        thread.update(cx, |thread, cx| {
1671            assert_eq!(thread.entries.len(), 1);
1672            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1673                assert_eq!(user_msg.id, None);
1674                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1675            } else {
1676                panic!("Expected UserMessage");
1677            }
1678        });
1679
1680        // Test appending to existing user message
1681        let message_1_id = UserMessageId::new();
1682        thread.update(cx, |thread, cx| {
1683            thread.push_user_content_block(
1684                Some(message_1_id.clone()),
1685                acp::ContentBlock::Text(acp::TextContent {
1686                    annotations: None,
1687                    text: "world!".to_string(),
1688                }),
1689                cx,
1690            );
1691        });
1692
1693        thread.update(cx, |thread, cx| {
1694            assert_eq!(thread.entries.len(), 1);
1695            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1696                assert_eq!(user_msg.id, Some(message_1_id));
1697                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1698            } else {
1699                panic!("Expected UserMessage");
1700            }
1701        });
1702
1703        // Test creating new user message after assistant message
1704        thread.update(cx, |thread, cx| {
1705            thread.push_assistant_content_block(
1706                acp::ContentBlock::Text(acp::TextContent {
1707                    annotations: None,
1708                    text: "Assistant response".to_string(),
1709                }),
1710                false,
1711                cx,
1712            );
1713        });
1714
1715        let message_2_id = UserMessageId::new();
1716        thread.update(cx, |thread, cx| {
1717            thread.push_user_content_block(
1718                Some(message_2_id.clone()),
1719                acp::ContentBlock::Text(acp::TextContent {
1720                    annotations: None,
1721                    text: "New user message".to_string(),
1722                }),
1723                cx,
1724            );
1725        });
1726
1727        thread.update(cx, |thread, cx| {
1728            assert_eq!(thread.entries.len(), 3);
1729            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1730                assert_eq!(user_msg.id, Some(message_2_id));
1731                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1732            } else {
1733                panic!("Expected UserMessage at index 2");
1734            }
1735        });
1736    }
1737
1738    #[gpui::test]
1739    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1740        init_test(cx);
1741
1742        let fs = FakeFs::new(cx.executor());
1743        let project = Project::test(fs, [], cx).await;
1744        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1745            |_, thread, mut cx| {
1746                async move {
1747                    thread.update(&mut cx, |thread, cx| {
1748                        thread
1749                            .handle_session_update(
1750                                acp::SessionUpdate::AgentThoughtChunk {
1751                                    content: "Thinking ".into(),
1752                                },
1753                                cx,
1754                            )
1755                            .unwrap();
1756                        thread
1757                            .handle_session_update(
1758                                acp::SessionUpdate::AgentThoughtChunk {
1759                                    content: "hard!".into(),
1760                                },
1761                                cx,
1762                            )
1763                            .unwrap();
1764                    })?;
1765                    Ok(acp::PromptResponse {
1766                        stop_reason: acp::StopReason::EndTurn,
1767                    })
1768                }
1769                .boxed_local()
1770            },
1771        ));
1772
1773        let thread = cx
1774            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1775            .await
1776            .unwrap();
1777
1778        thread
1779            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1780            .await
1781            .unwrap();
1782
1783        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1784        assert_eq!(
1785            output,
1786            indoc! {r#"
1787            ## User
1788
1789            Hello from Zed!
1790
1791            ## Assistant
1792
1793            <thinking>
1794            Thinking hard!
1795            </thinking>
1796
1797            "#}
1798        );
1799    }
1800
1801    #[gpui::test]
1802    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1803        init_test(cx);
1804
1805        let fs = FakeFs::new(cx.executor());
1806        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1807            .await;
1808        let project = Project::test(fs.clone(), [], cx).await;
1809        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1810        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1811        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1812            move |_, thread, mut cx| {
1813                let read_file_tx = read_file_tx.clone();
1814                async move {
1815                    let content = thread
1816                        .update(&mut cx, |thread, cx| {
1817                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1818                        })
1819                        .unwrap()
1820                        .await
1821                        .unwrap();
1822                    assert_eq!(content, "one\ntwo\nthree\n");
1823                    read_file_tx.take().unwrap().send(()).unwrap();
1824                    thread
1825                        .update(&mut cx, |thread, cx| {
1826                            thread.write_text_file(
1827                                path!("/tmp/foo").into(),
1828                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1829                                cx,
1830                            )
1831                        })
1832                        .unwrap()
1833                        .await
1834                        .unwrap();
1835                    Ok(acp::PromptResponse {
1836                        stop_reason: acp::StopReason::EndTurn,
1837                    })
1838                }
1839                .boxed_local()
1840            },
1841        ));
1842
1843        let (worktree, pathbuf) = project
1844            .update(cx, |project, cx| {
1845                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1846            })
1847            .await
1848            .unwrap();
1849        let buffer = project
1850            .update(cx, |project, cx| {
1851                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1852            })
1853            .await
1854            .unwrap();
1855
1856        let thread = cx
1857            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1858            .await
1859            .unwrap();
1860
1861        let request = thread.update(cx, |thread, cx| {
1862            thread.send_raw("Extend the count in /tmp/foo", cx)
1863        });
1864        read_file_rx.await.ok();
1865        buffer.update(cx, |buffer, cx| {
1866            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1867        });
1868        cx.run_until_parked();
1869        assert_eq!(
1870            buffer.read_with(cx, |buffer, _| buffer.text()),
1871            "zero\none\ntwo\nthree\nfour\nfive\n"
1872        );
1873        assert_eq!(
1874            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1875            "zero\none\ntwo\nthree\nfour\nfive\n"
1876        );
1877        request.await.unwrap();
1878    }
1879
1880    #[gpui::test]
1881    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1882        init_test(cx);
1883
1884        let fs = FakeFs::new(cx.executor());
1885        let project = Project::test(fs, [], cx).await;
1886        let id = acp::ToolCallId("test".into());
1887
1888        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1889            let id = id.clone();
1890            move |_, thread, mut cx| {
1891                let id = id.clone();
1892                async move {
1893                    thread
1894                        .update(&mut cx, |thread, cx| {
1895                            thread.handle_session_update(
1896                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1897                                    id: id.clone(),
1898                                    title: "Label".into(),
1899                                    kind: acp::ToolKind::Fetch,
1900                                    status: acp::ToolCallStatus::InProgress,
1901                                    content: vec![],
1902                                    locations: vec![],
1903                                    raw_input: None,
1904                                    raw_output: None,
1905                                }),
1906                                cx,
1907                            )
1908                        })
1909                        .unwrap()
1910                        .unwrap();
1911                    Ok(acp::PromptResponse {
1912                        stop_reason: acp::StopReason::EndTurn,
1913                    })
1914                }
1915                .boxed_local()
1916            }
1917        }));
1918
1919        let thread = cx
1920            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1921            .await
1922            .unwrap();
1923
1924        let request = thread.update(cx, |thread, cx| {
1925            thread.send_raw("Fetch https://example.com", cx)
1926        });
1927
1928        run_until_first_tool_call(&thread, cx).await;
1929
1930        thread.read_with(cx, |thread, _| {
1931            assert!(matches!(
1932                thread.entries[1],
1933                AgentThreadEntry::ToolCall(ToolCall {
1934                    status: ToolCallStatus::Allowed {
1935                        status: acp::ToolCallStatus::InProgress,
1936                        ..
1937                    },
1938                    ..
1939                })
1940            ));
1941        });
1942
1943        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1944
1945        thread.read_with(cx, |thread, _| {
1946            assert!(matches!(
1947                &thread.entries[1],
1948                AgentThreadEntry::ToolCall(ToolCall {
1949                    status: ToolCallStatus::Canceled,
1950                    ..
1951                })
1952            ));
1953        });
1954
1955        thread
1956            .update(cx, |thread, cx| {
1957                thread.handle_session_update(
1958                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
1959                        id,
1960                        fields: acp::ToolCallUpdateFields {
1961                            status: Some(acp::ToolCallStatus::Completed),
1962                            ..Default::default()
1963                        },
1964                    }),
1965                    cx,
1966                )
1967            })
1968            .unwrap();
1969
1970        request.await.unwrap();
1971
1972        thread.read_with(cx, |thread, _| {
1973            assert!(matches!(
1974                thread.entries[1],
1975                AgentThreadEntry::ToolCall(ToolCall {
1976                    status: ToolCallStatus::Allowed {
1977                        status: acp::ToolCallStatus::Completed,
1978                        ..
1979                    },
1980                    ..
1981                })
1982            ));
1983        });
1984    }
1985
1986    #[gpui::test]
1987    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
1988        init_test(cx);
1989        let fs = FakeFs::new(cx.background_executor.clone());
1990        fs.insert_tree(path!("/test"), json!({})).await;
1991        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
1992
1993        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1994            move |_, thread, mut cx| {
1995                async move {
1996                    thread
1997                        .update(&mut cx, |thread, cx| {
1998                            thread.handle_session_update(
1999                                acp::SessionUpdate::ToolCall(acp::ToolCall {
2000                                    id: acp::ToolCallId("test".into()),
2001                                    title: "Label".into(),
2002                                    kind: acp::ToolKind::Edit,
2003                                    status: acp::ToolCallStatus::Completed,
2004                                    content: vec![acp::ToolCallContent::Diff {
2005                                        diff: acp::Diff {
2006                                            path: "/test/test.txt".into(),
2007                                            old_text: None,
2008                                            new_text: "foo".into(),
2009                                        },
2010                                    }],
2011                                    locations: vec![],
2012                                    raw_input: None,
2013                                    raw_output: None,
2014                                }),
2015                                cx,
2016                            )
2017                        })
2018                        .unwrap()
2019                        .unwrap();
2020                    Ok(acp::PromptResponse {
2021                        stop_reason: acp::StopReason::EndTurn,
2022                    })
2023                }
2024                .boxed_local()
2025            }
2026        }));
2027
2028        let thread = cx
2029            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2030            .await
2031            .unwrap();
2032
2033        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2034            .await
2035            .unwrap();
2036
2037        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2038    }
2039
2040    #[gpui::test(iterations = 10)]
2041    async fn test_checkpoints(cx: &mut TestAppContext) {
2042        init_test(cx);
2043        let fs = FakeFs::new(cx.background_executor.clone());
2044        fs.insert_tree(
2045            path!("/test"),
2046            json!({
2047                ".git": {}
2048            }),
2049        )
2050        .await;
2051        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2052
2053        let simulate_changes = Arc::new(AtomicBool::new(true));
2054        let next_filename = Arc::new(AtomicUsize::new(0));
2055        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2056            let simulate_changes = simulate_changes.clone();
2057            let next_filename = next_filename.clone();
2058            let fs = fs.clone();
2059            move |request, thread, mut cx| {
2060                let fs = fs.clone();
2061                let simulate_changes = simulate_changes.clone();
2062                let next_filename = next_filename.clone();
2063                async move {
2064                    if simulate_changes.load(SeqCst) {
2065                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2066                        fs.write(Path::new(&filename), b"").await?;
2067                    }
2068
2069                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2070                        panic!("expected text content block");
2071                    };
2072                    thread.update(&mut cx, |thread, cx| {
2073                        thread
2074                            .handle_session_update(
2075                                acp::SessionUpdate::AgentMessageChunk {
2076                                    content: content.text.to_uppercase().into(),
2077                                },
2078                                cx,
2079                            )
2080                            .unwrap();
2081                    })?;
2082                    Ok(acp::PromptResponse {
2083                        stop_reason: acp::StopReason::EndTurn,
2084                    })
2085                }
2086                .boxed_local()
2087            }
2088        }));
2089        let thread = cx
2090            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2091            .await
2092            .unwrap();
2093
2094        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2095            .await
2096            .unwrap();
2097        thread.read_with(cx, |thread, cx| {
2098            assert_eq!(
2099                thread.to_markdown(cx),
2100                indoc! {"
2101                    ## User (checkpoint)
2102
2103                    Lorem
2104
2105                    ## Assistant
2106
2107                    LOREM
2108
2109                "}
2110            );
2111        });
2112        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2113
2114        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2115            .await
2116            .unwrap();
2117        thread.read_with(cx, |thread, cx| {
2118            assert_eq!(
2119                thread.to_markdown(cx),
2120                indoc! {"
2121                    ## User (checkpoint)
2122
2123                    Lorem
2124
2125                    ## Assistant
2126
2127                    LOREM
2128
2129                    ## User (checkpoint)
2130
2131                    ipsum
2132
2133                    ## Assistant
2134
2135                    IPSUM
2136
2137                "}
2138            );
2139        });
2140        assert_eq!(
2141            fs.files(),
2142            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2143        );
2144
2145        // Checkpoint isn't stored when there are no changes.
2146        simulate_changes.store(false, SeqCst);
2147        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2148            .await
2149            .unwrap();
2150        thread.read_with(cx, |thread, cx| {
2151            assert_eq!(
2152                thread.to_markdown(cx),
2153                indoc! {"
2154                    ## User (checkpoint)
2155
2156                    Lorem
2157
2158                    ## Assistant
2159
2160                    LOREM
2161
2162                    ## User (checkpoint)
2163
2164                    ipsum
2165
2166                    ## Assistant
2167
2168                    IPSUM
2169
2170                    ## User
2171
2172                    dolor
2173
2174                    ## Assistant
2175
2176                    DOLOR
2177
2178                "}
2179            );
2180        });
2181        assert_eq!(
2182            fs.files(),
2183            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2184        );
2185
2186        // Rewinding the conversation truncates the history and restores the checkpoint.
2187        thread
2188            .update(cx, |thread, cx| {
2189                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2190                    panic!("unexpected entries {:?}", thread.entries)
2191                };
2192                thread.rewind(message.id.clone().unwrap(), cx)
2193            })
2194            .await
2195            .unwrap();
2196        thread.read_with(cx, |thread, cx| {
2197            assert_eq!(
2198                thread.to_markdown(cx),
2199                indoc! {"
2200                    ## User (checkpoint)
2201
2202                    Lorem
2203
2204                    ## Assistant
2205
2206                    LOREM
2207
2208                "}
2209            );
2210        });
2211        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2212    }
2213
2214    async fn run_until_first_tool_call(
2215        thread: &Entity<AcpThread>,
2216        cx: &mut TestAppContext,
2217    ) -> usize {
2218        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2219
2220        let subscription = cx.update(|cx| {
2221            cx.subscribe(thread, move |thread, _, cx| {
2222                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2223                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2224                        return tx.try_send(ix).unwrap();
2225                    }
2226                }
2227            })
2228        });
2229
2230        select! {
2231            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2232                panic!("Timeout waiting for tool call")
2233            }
2234            ix = rx.next().fuse() => {
2235                drop(subscription);
2236                ix.unwrap()
2237            }
2238        }
2239    }
2240
2241    #[derive(Clone, Default)]
2242    struct FakeAgentConnection {
2243        auth_methods: Vec<acp::AuthMethod>,
2244        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2245        on_user_message: Option<
2246            Rc<
2247                dyn Fn(
2248                        acp::PromptRequest,
2249                        WeakEntity<AcpThread>,
2250                        AsyncApp,
2251                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2252                    + 'static,
2253            >,
2254        >,
2255    }
2256
2257    impl FakeAgentConnection {
2258        fn new() -> Self {
2259            Self {
2260                auth_methods: Vec::new(),
2261                on_user_message: None,
2262                sessions: Arc::default(),
2263            }
2264        }
2265
2266        #[expect(unused)]
2267        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2268            self.auth_methods = auth_methods;
2269            self
2270        }
2271
2272        fn on_user_message(
2273            mut self,
2274            handler: impl Fn(
2275                acp::PromptRequest,
2276                WeakEntity<AcpThread>,
2277                AsyncApp,
2278            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2279            + 'static,
2280        ) -> Self {
2281            self.on_user_message.replace(Rc::new(handler));
2282            self
2283        }
2284    }
2285
2286    impl AgentConnection for FakeAgentConnection {
2287        fn auth_methods(&self) -> &[acp::AuthMethod] {
2288            &self.auth_methods
2289        }
2290
2291        fn new_thread(
2292            self: Rc<Self>,
2293            project: Entity<Project>,
2294            _cwd: &Path,
2295            cx: &mut gpui::App,
2296        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2297            let session_id = acp::SessionId(
2298                rand::thread_rng()
2299                    .sample_iter(&rand::distributions::Alphanumeric)
2300                    .take(7)
2301                    .map(char::from)
2302                    .collect::<String>()
2303                    .into(),
2304            );
2305            let thread =
2306                cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2307            self.sessions.lock().insert(session_id, thread.downgrade());
2308            Task::ready(Ok(thread))
2309        }
2310
2311        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2312            if self.auth_methods().iter().any(|m| m.id == method) {
2313                Task::ready(Ok(()))
2314            } else {
2315                Task::ready(Err(anyhow!("Invalid Auth Method")))
2316            }
2317        }
2318
2319        fn prompt(
2320            &self,
2321            _id: Option<UserMessageId>,
2322            params: acp::PromptRequest,
2323            cx: &mut App,
2324        ) -> Task<gpui::Result<acp::PromptResponse>> {
2325            let sessions = self.sessions.lock();
2326            let thread = sessions.get(&params.session_id).unwrap();
2327            if let Some(handler) = &self.on_user_message {
2328                let handler = handler.clone();
2329                let thread = thread.clone();
2330                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2331            } else {
2332                Task::ready(Ok(acp::PromptResponse {
2333                    stop_reason: acp::StopReason::EndTurn,
2334                }))
2335            }
2336        }
2337
2338        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2339            let sessions = self.sessions.lock();
2340            let thread = sessions.get(&session_id).unwrap().clone();
2341
2342            cx.spawn(async move |cx| {
2343                thread
2344                    .update(cx, |thread, cx| thread.cancel(cx))
2345                    .unwrap()
2346                    .await
2347            })
2348            .detach();
2349        }
2350
2351        fn session_editor(
2352            &self,
2353            session_id: &acp::SessionId,
2354            _cx: &mut App,
2355        ) -> Option<Rc<dyn AgentSessionEditor>> {
2356            Some(Rc::new(FakeAgentSessionEditor {
2357                _session_id: session_id.clone(),
2358            }))
2359        }
2360
2361        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2362            self
2363        }
2364    }
2365
2366    struct FakeAgentSessionEditor {
2367        _session_id: acp::SessionId,
2368    }
2369
2370    impl AgentSessionEditor for FakeAgentSessionEditor {
2371        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2372            Task::ready(Ok(()))
2373        }
2374    }
2375}