acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5
   6pub use connection::*;
   7pub use diff::*;
   8pub use mention::*;
   9pub use terminal::*;
  10
  11use action_log::ActionLog;
  12use agent_client_protocol as acp;
  13use anyhow::{Context as _, Result, anyhow};
  14use editor::Bias;
  15use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  16use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  17use itertools::Itertools;
  18use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  19use markdown::Markdown;
  20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  21use std::collections::HashMap;
  22use std::error::Error;
  23use std::fmt::{Formatter, Write};
  24use std::ops::Range;
  25use std::process::ExitStatus;
  26use std::rc::Rc;
  27use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  28use ui::App;
  29use util::ResultExt;
  30
  31#[derive(Debug)]
  32pub struct UserMessage {
  33    pub id: Option<UserMessageId>,
  34    pub content: ContentBlock,
  35    pub chunks: Vec<acp::ContentBlock>,
  36    pub checkpoint: Option<Checkpoint>,
  37}
  38
  39#[derive(Debug)]
  40pub struct Checkpoint {
  41    git_checkpoint: GitStoreCheckpoint,
  42    pub show: bool,
  43}
  44
  45impl UserMessage {
  46    fn to_markdown(&self, cx: &App) -> String {
  47        let mut markdown = String::new();
  48        if self
  49            .checkpoint
  50            .as_ref()
  51            .map_or(false, |checkpoint| checkpoint.show)
  52        {
  53            writeln!(markdown, "## User (checkpoint)").unwrap();
  54        } else {
  55            writeln!(markdown, "## User").unwrap();
  56        }
  57        writeln!(markdown).unwrap();
  58        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
  59        writeln!(markdown).unwrap();
  60        markdown
  61    }
  62}
  63
  64#[derive(Debug, PartialEq)]
  65pub struct AssistantMessage {
  66    pub chunks: Vec<AssistantMessageChunk>,
  67}
  68
  69impl AssistantMessage {
  70    pub fn to_markdown(&self, cx: &App) -> String {
  71        format!(
  72            "## Assistant\n\n{}\n\n",
  73            self.chunks
  74                .iter()
  75                .map(|chunk| chunk.to_markdown(cx))
  76                .join("\n\n")
  77        )
  78    }
  79}
  80
  81#[derive(Debug, PartialEq)]
  82pub enum AssistantMessageChunk {
  83    Message { block: ContentBlock },
  84    Thought { block: ContentBlock },
  85}
  86
  87impl AssistantMessageChunk {
  88    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
  89        Self::Message {
  90            block: ContentBlock::new(chunk.into(), language_registry, cx),
  91        }
  92    }
  93
  94    fn to_markdown(&self, cx: &App) -> String {
  95        match self {
  96            Self::Message { block } => block.to_markdown(cx).to_string(),
  97            Self::Thought { block } => {
  98                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
  99            }
 100        }
 101    }
 102}
 103
 104#[derive(Debug)]
 105pub enum AgentThreadEntry {
 106    UserMessage(UserMessage),
 107    AssistantMessage(AssistantMessage),
 108    ToolCall(ToolCall),
 109}
 110
 111impl AgentThreadEntry {
 112    fn to_markdown(&self, cx: &App) -> String {
 113        match self {
 114            Self::UserMessage(message) => message.to_markdown(cx),
 115            Self::AssistantMessage(message) => message.to_markdown(cx),
 116            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 117        }
 118    }
 119
 120    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 121        if let AgentThreadEntry::ToolCall(call) = self {
 122            itertools::Either::Left(call.diffs())
 123        } else {
 124            itertools::Either::Right(std::iter::empty())
 125        }
 126    }
 127
 128    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 129        if let AgentThreadEntry::ToolCall(call) = self {
 130            itertools::Either::Left(call.terminals())
 131        } else {
 132            itertools::Either::Right(std::iter::empty())
 133        }
 134    }
 135
 136    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 137        if let AgentThreadEntry::ToolCall(ToolCall {
 138            locations,
 139            resolved_locations,
 140            ..
 141        }) = self
 142        {
 143            Some((
 144                locations.get(ix)?.clone(),
 145                resolved_locations.get(ix)?.clone()?,
 146            ))
 147        } else {
 148            None
 149        }
 150    }
 151}
 152
 153#[derive(Debug)]
 154pub struct ToolCall {
 155    pub id: acp::ToolCallId,
 156    pub label: Entity<Markdown>,
 157    pub kind: acp::ToolKind,
 158    pub content: Vec<ToolCallContent>,
 159    pub status: ToolCallStatus,
 160    pub locations: Vec<acp::ToolCallLocation>,
 161    pub resolved_locations: Vec<Option<AgentLocation>>,
 162    pub raw_input: Option<serde_json::Value>,
 163    pub raw_output: Option<serde_json::Value>,
 164}
 165
 166impl ToolCall {
 167    fn from_acp(
 168        tool_call: acp::ToolCall,
 169        status: ToolCallStatus,
 170        language_registry: Arc<LanguageRegistry>,
 171        cx: &mut App,
 172    ) -> Self {
 173        Self {
 174            id: tool_call.id,
 175            label: cx.new(|cx| {
 176                Markdown::new(
 177                    tool_call.title.into(),
 178                    Some(language_registry.clone()),
 179                    None,
 180                    cx,
 181                )
 182            }),
 183            kind: tool_call.kind,
 184            content: tool_call
 185                .content
 186                .into_iter()
 187                .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
 188                .collect(),
 189            locations: tool_call.locations,
 190            resolved_locations: Vec::default(),
 191            status,
 192            raw_input: tool_call.raw_input,
 193            raw_output: tool_call.raw_output,
 194        }
 195    }
 196
 197    fn update_fields(
 198        &mut self,
 199        fields: acp::ToolCallUpdateFields,
 200        language_registry: Arc<LanguageRegistry>,
 201        cx: &mut App,
 202    ) {
 203        let acp::ToolCallUpdateFields {
 204            kind,
 205            status,
 206            title,
 207            content,
 208            locations,
 209            raw_input,
 210            raw_output,
 211        } = fields;
 212
 213        if let Some(kind) = kind {
 214            self.kind = kind;
 215        }
 216
 217        if let Some(status) = status {
 218            self.status = ToolCallStatus::Allowed { status };
 219        }
 220
 221        if let Some(title) = title {
 222            self.label.update(cx, |label, cx| {
 223                label.replace(title, cx);
 224            });
 225        }
 226
 227        if let Some(content) = content {
 228            self.content = content
 229                .into_iter()
 230                .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
 231                .collect();
 232        }
 233
 234        if let Some(locations) = locations {
 235            self.locations = locations;
 236        }
 237
 238        if let Some(raw_input) = raw_input {
 239            self.raw_input = Some(raw_input);
 240        }
 241
 242        if let Some(raw_output) = raw_output {
 243            if self.content.is_empty() {
 244                if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 245                {
 246                    self.content
 247                        .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 248                            markdown,
 249                        }));
 250                }
 251            }
 252            self.raw_output = Some(raw_output);
 253        }
 254    }
 255
 256    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 257        self.content.iter().filter_map(|content| match content {
 258            ToolCallContent::Diff(diff) => Some(diff),
 259            ToolCallContent::ContentBlock(_) => None,
 260            ToolCallContent::Terminal(_) => None,
 261        })
 262    }
 263
 264    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 265        self.content.iter().filter_map(|content| match content {
 266            ToolCallContent::Terminal(terminal) => Some(terminal),
 267            ToolCallContent::ContentBlock(_) => None,
 268            ToolCallContent::Diff(_) => None,
 269        })
 270    }
 271
 272    fn to_markdown(&self, cx: &App) -> String {
 273        let mut markdown = format!(
 274            "**Tool Call: {}**\nStatus: {}\n\n",
 275            self.label.read(cx).source(),
 276            self.status
 277        );
 278        for content in &self.content {
 279            markdown.push_str(content.to_markdown(cx).as_str());
 280            markdown.push_str("\n\n");
 281        }
 282        markdown
 283    }
 284
 285    async fn resolve_location(
 286        location: acp::ToolCallLocation,
 287        project: WeakEntity<Project>,
 288        cx: &mut AsyncApp,
 289    ) -> Option<AgentLocation> {
 290        let buffer = project
 291            .update(cx, |project, cx| {
 292                if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
 293                    Some(project.open_buffer(path, cx))
 294                } else {
 295                    None
 296                }
 297            })
 298            .ok()??;
 299        let buffer = buffer.await.log_err()?;
 300        let position = buffer
 301            .update(cx, |buffer, _| {
 302                if let Some(row) = location.line {
 303                    let snapshot = buffer.snapshot();
 304                    let column = snapshot.indent_size_for_line(row).len;
 305                    let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 306                    snapshot.anchor_before(point)
 307                } else {
 308                    Anchor::MIN
 309                }
 310            })
 311            .ok()?;
 312
 313        Some(AgentLocation {
 314            buffer: buffer.downgrade(),
 315            position,
 316        })
 317    }
 318
 319    fn resolve_locations(
 320        &self,
 321        project: Entity<Project>,
 322        cx: &mut App,
 323    ) -> Task<Vec<Option<AgentLocation>>> {
 324        let locations = self.locations.clone();
 325        project.update(cx, |_, cx| {
 326            cx.spawn(async move |project, cx| {
 327                let mut new_locations = Vec::new();
 328                for location in locations {
 329                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 330                }
 331                new_locations
 332            })
 333        })
 334    }
 335}
 336
 337#[derive(Debug)]
 338pub enum ToolCallStatus {
 339    WaitingForConfirmation {
 340        options: Vec<acp::PermissionOption>,
 341        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 342    },
 343    Allowed {
 344        status: acp::ToolCallStatus,
 345    },
 346    Rejected,
 347    Canceled,
 348}
 349
 350impl Display for ToolCallStatus {
 351    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 352        write!(
 353            f,
 354            "{}",
 355            match self {
 356                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 357                ToolCallStatus::Allowed { status } => match status {
 358                    acp::ToolCallStatus::Pending => "Pending",
 359                    acp::ToolCallStatus::InProgress => "In Progress",
 360                    acp::ToolCallStatus::Completed => "Completed",
 361                    acp::ToolCallStatus::Failed => "Failed",
 362                },
 363                ToolCallStatus::Rejected => "Rejected",
 364                ToolCallStatus::Canceled => "Canceled",
 365            }
 366        )
 367    }
 368}
 369
 370#[derive(Debug, PartialEq, Clone)]
 371pub enum ContentBlock {
 372    Empty,
 373    Markdown { markdown: Entity<Markdown> },
 374    ResourceLink { resource_link: acp::ResourceLink },
 375}
 376
 377impl ContentBlock {
 378    pub fn new(
 379        block: acp::ContentBlock,
 380        language_registry: &Arc<LanguageRegistry>,
 381        cx: &mut App,
 382    ) -> Self {
 383        let mut this = Self::Empty;
 384        this.append(block, language_registry, cx);
 385        this
 386    }
 387
 388    pub fn new_combined(
 389        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 390        language_registry: Arc<LanguageRegistry>,
 391        cx: &mut App,
 392    ) -> Self {
 393        let mut this = Self::Empty;
 394        for block in blocks {
 395            this.append(block, &language_registry, cx);
 396        }
 397        this
 398    }
 399
 400    pub fn append(
 401        &mut self,
 402        block: acp::ContentBlock,
 403        language_registry: &Arc<LanguageRegistry>,
 404        cx: &mut App,
 405    ) {
 406        if matches!(self, ContentBlock::Empty) {
 407            if let acp::ContentBlock::ResourceLink(resource_link) = block {
 408                *self = ContentBlock::ResourceLink { resource_link };
 409                return;
 410            }
 411        }
 412
 413        let new_content = self.block_string_contents(block);
 414
 415        match self {
 416            ContentBlock::Empty => {
 417                *self = Self::create_markdown_block(new_content, language_registry, cx);
 418            }
 419            ContentBlock::Markdown { markdown } => {
 420                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 421            }
 422            ContentBlock::ResourceLink { resource_link } => {
 423                let existing_content = Self::resource_link_md(&resource_link.uri);
 424                let combined = format!("{}\n{}", existing_content, new_content);
 425
 426                *self = Self::create_markdown_block(combined, language_registry, cx);
 427            }
 428        }
 429    }
 430
 431    fn create_markdown_block(
 432        content: String,
 433        language_registry: &Arc<LanguageRegistry>,
 434        cx: &mut App,
 435    ) -> ContentBlock {
 436        ContentBlock::Markdown {
 437            markdown: cx
 438                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 439        }
 440    }
 441
 442    fn block_string_contents(&self, block: acp::ContentBlock) -> String {
 443        match block {
 444            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 445            acp::ContentBlock::ResourceLink(resource_link) => {
 446                Self::resource_link_md(&resource_link.uri)
 447            }
 448            acp::ContentBlock::Resource(acp::EmbeddedResource {
 449                resource:
 450                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 451                        uri,
 452                        ..
 453                    }),
 454                ..
 455            }) => Self::resource_link_md(&uri),
 456            acp::ContentBlock::Image(image) => Self::image_md(&image),
 457            acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
 458        }
 459    }
 460
 461    fn resource_link_md(uri: &str) -> String {
 462        if let Some(uri) = MentionUri::parse(&uri).log_err() {
 463            uri.as_link().to_string()
 464        } else {
 465            uri.to_string()
 466        }
 467    }
 468
 469    fn image_md(_image: &acp::ImageContent) -> String {
 470        "`Image`".into()
 471    }
 472
 473    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 474        match self {
 475            ContentBlock::Empty => "",
 476            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 477            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 478        }
 479    }
 480
 481    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 482        match self {
 483            ContentBlock::Empty => None,
 484            ContentBlock::Markdown { markdown } => Some(markdown),
 485            ContentBlock::ResourceLink { .. } => None,
 486        }
 487    }
 488
 489    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 490        match self {
 491            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 492            _ => None,
 493        }
 494    }
 495}
 496
 497#[derive(Debug)]
 498pub enum ToolCallContent {
 499    ContentBlock(ContentBlock),
 500    Diff(Entity<Diff>),
 501    Terminal(Entity<Terminal>),
 502}
 503
 504impl ToolCallContent {
 505    pub fn from_acp(
 506        content: acp::ToolCallContent,
 507        language_registry: Arc<LanguageRegistry>,
 508        cx: &mut App,
 509    ) -> Self {
 510        match content {
 511            acp::ToolCallContent::Content { content } => {
 512                Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
 513            }
 514            acp::ToolCallContent::Diff { diff } => {
 515                Self::Diff(cx.new(|cx| Diff::from_acp(diff, language_registry, cx)))
 516            }
 517        }
 518    }
 519
 520    pub fn to_markdown(&self, cx: &App) -> String {
 521        match self {
 522            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 523            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 524            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 525        }
 526    }
 527}
 528
 529#[derive(Debug, PartialEq)]
 530pub enum ToolCallUpdate {
 531    UpdateFields(acp::ToolCallUpdate),
 532    UpdateDiff(ToolCallUpdateDiff),
 533    UpdateTerminal(ToolCallUpdateTerminal),
 534}
 535
 536impl ToolCallUpdate {
 537    fn id(&self) -> &acp::ToolCallId {
 538        match self {
 539            Self::UpdateFields(update) => &update.id,
 540            Self::UpdateDiff(diff) => &diff.id,
 541            Self::UpdateTerminal(terminal) => &terminal.id,
 542        }
 543    }
 544}
 545
 546impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 547    fn from(update: acp::ToolCallUpdate) -> Self {
 548        Self::UpdateFields(update)
 549    }
 550}
 551
 552impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 553    fn from(diff: ToolCallUpdateDiff) -> Self {
 554        Self::UpdateDiff(diff)
 555    }
 556}
 557
 558#[derive(Debug, PartialEq)]
 559pub struct ToolCallUpdateDiff {
 560    pub id: acp::ToolCallId,
 561    pub diff: Entity<Diff>,
 562}
 563
 564impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 565    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 566        Self::UpdateTerminal(terminal)
 567    }
 568}
 569
 570#[derive(Debug, PartialEq)]
 571pub struct ToolCallUpdateTerminal {
 572    pub id: acp::ToolCallId,
 573    pub terminal: Entity<Terminal>,
 574}
 575
 576#[derive(Debug, Default)]
 577pub struct Plan {
 578    pub entries: Vec<PlanEntry>,
 579}
 580
 581#[derive(Debug)]
 582pub struct PlanStats<'a> {
 583    pub in_progress_entry: Option<&'a PlanEntry>,
 584    pub pending: u32,
 585    pub completed: u32,
 586}
 587
 588impl Plan {
 589    pub fn is_empty(&self) -> bool {
 590        self.entries.is_empty()
 591    }
 592
 593    pub fn stats(&self) -> PlanStats<'_> {
 594        let mut stats = PlanStats {
 595            in_progress_entry: None,
 596            pending: 0,
 597            completed: 0,
 598        };
 599
 600        for entry in &self.entries {
 601            match &entry.status {
 602                acp::PlanEntryStatus::Pending => {
 603                    stats.pending += 1;
 604                }
 605                acp::PlanEntryStatus::InProgress => {
 606                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 607                }
 608                acp::PlanEntryStatus::Completed => {
 609                    stats.completed += 1;
 610                }
 611            }
 612        }
 613
 614        stats
 615    }
 616}
 617
 618#[derive(Debug)]
 619pub struct PlanEntry {
 620    pub content: Entity<Markdown>,
 621    pub priority: acp::PlanEntryPriority,
 622    pub status: acp::PlanEntryStatus,
 623}
 624
 625impl PlanEntry {
 626    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 627        Self {
 628            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 629            priority: entry.priority,
 630            status: entry.status,
 631        }
 632    }
 633}
 634
 635pub struct AcpThread {
 636    title: SharedString,
 637    entries: Vec<AgentThreadEntry>,
 638    plan: Plan,
 639    project: Entity<Project>,
 640    action_log: Entity<ActionLog>,
 641    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 642    send_task: Option<Task<()>>,
 643    connection: Rc<dyn AgentConnection>,
 644    session_id: acp::SessionId,
 645}
 646
 647pub enum AcpThreadEvent {
 648    NewEntry,
 649    EntryUpdated(usize),
 650    EntriesRemoved(Range<usize>),
 651    ToolAuthorizationRequired,
 652    Stopped,
 653    Error,
 654    ServerExited(ExitStatus),
 655}
 656
 657impl EventEmitter<AcpThreadEvent> for AcpThread {}
 658
 659#[derive(PartialEq, Eq)]
 660pub enum ThreadStatus {
 661    Idle,
 662    WaitingForToolConfirmation,
 663    Generating,
 664}
 665
 666#[derive(Debug, Clone)]
 667pub enum LoadError {
 668    Unsupported {
 669        error_message: SharedString,
 670        upgrade_message: SharedString,
 671        upgrade_command: String,
 672    },
 673    Exited(i32),
 674    Other(SharedString),
 675}
 676
 677impl Display for LoadError {
 678    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 679        match self {
 680            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 681            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 682            LoadError::Other(msg) => write!(f, "{}", msg),
 683        }
 684    }
 685}
 686
 687impl Error for LoadError {}
 688
 689impl AcpThread {
 690    pub fn new(
 691        title: impl Into<SharedString>,
 692        connection: Rc<dyn AgentConnection>,
 693        project: Entity<Project>,
 694        session_id: acp::SessionId,
 695        cx: &mut Context<Self>,
 696    ) -> Self {
 697        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 698
 699        Self {
 700            action_log,
 701            shared_buffers: Default::default(),
 702            entries: Default::default(),
 703            plan: Default::default(),
 704            title: title.into(),
 705            project,
 706            send_task: None,
 707            connection,
 708            session_id,
 709        }
 710    }
 711
 712    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
 713        &self.connection
 714    }
 715
 716    pub fn action_log(&self) -> &Entity<ActionLog> {
 717        &self.action_log
 718    }
 719
 720    pub fn project(&self) -> &Entity<Project> {
 721        &self.project
 722    }
 723
 724    pub fn title(&self) -> SharedString {
 725        self.title.clone()
 726    }
 727
 728    pub fn entries(&self) -> &[AgentThreadEntry] {
 729        &self.entries
 730    }
 731
 732    pub fn session_id(&self) -> &acp::SessionId {
 733        &self.session_id
 734    }
 735
 736    pub fn status(&self) -> ThreadStatus {
 737        if self.send_task.is_some() {
 738            if self.waiting_for_tool_confirmation() {
 739                ThreadStatus::WaitingForToolConfirmation
 740            } else {
 741                ThreadStatus::Generating
 742            }
 743        } else {
 744            ThreadStatus::Idle
 745        }
 746    }
 747
 748    pub fn has_pending_edit_tool_calls(&self) -> bool {
 749        for entry in self.entries.iter().rev() {
 750            match entry {
 751                AgentThreadEntry::UserMessage(_) => return false,
 752                AgentThreadEntry::ToolCall(
 753                    call @ ToolCall {
 754                        status:
 755                            ToolCallStatus::Allowed {
 756                                status:
 757                                    acp::ToolCallStatus::InProgress | acp::ToolCallStatus::Pending,
 758                            },
 759                        ..
 760                    },
 761                ) if call.diffs().next().is_some() => {
 762                    return true;
 763                }
 764                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 765            }
 766        }
 767
 768        false
 769    }
 770
 771    pub fn used_tools_since_last_user_message(&self) -> bool {
 772        for entry in self.entries.iter().rev() {
 773            match entry {
 774                AgentThreadEntry::UserMessage(..) => return false,
 775                AgentThreadEntry::AssistantMessage(..) => continue,
 776                AgentThreadEntry::ToolCall(..) => return true,
 777            }
 778        }
 779
 780        false
 781    }
 782
 783    pub fn handle_session_update(
 784        &mut self,
 785        update: acp::SessionUpdate,
 786        cx: &mut Context<Self>,
 787    ) -> Result<()> {
 788        match update {
 789            acp::SessionUpdate::UserMessageChunk { content } => {
 790                self.push_user_content_block(None, content, cx);
 791            }
 792            acp::SessionUpdate::AgentMessageChunk { content } => {
 793                self.push_assistant_content_block(content, false, cx);
 794            }
 795            acp::SessionUpdate::AgentThoughtChunk { content } => {
 796                self.push_assistant_content_block(content, true, cx);
 797            }
 798            acp::SessionUpdate::ToolCall(tool_call) => {
 799                self.upsert_tool_call(tool_call, cx);
 800            }
 801            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
 802                self.update_tool_call(tool_call_update, cx)?;
 803            }
 804            acp::SessionUpdate::Plan(plan) => {
 805                self.update_plan(plan, cx);
 806            }
 807        }
 808        Ok(())
 809    }
 810
 811    pub fn push_user_content_block(
 812        &mut self,
 813        message_id: Option<UserMessageId>,
 814        chunk: acp::ContentBlock,
 815        cx: &mut Context<Self>,
 816    ) {
 817        let language_registry = self.project.read(cx).languages().clone();
 818        let entries_len = self.entries.len();
 819
 820        if let Some(last_entry) = self.entries.last_mut()
 821            && let AgentThreadEntry::UserMessage(UserMessage {
 822                id,
 823                content,
 824                chunks,
 825                ..
 826            }) = last_entry
 827        {
 828            *id = message_id.or(id.take());
 829            content.append(chunk.clone(), &language_registry, cx);
 830            chunks.push(chunk);
 831            let idx = entries_len - 1;
 832            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 833        } else {
 834            let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
 835            self.push_entry(
 836                AgentThreadEntry::UserMessage(UserMessage {
 837                    id: message_id,
 838                    content,
 839                    chunks: vec![chunk],
 840                    checkpoint: None,
 841                }),
 842                cx,
 843            );
 844        }
 845    }
 846
 847    pub fn push_assistant_content_block(
 848        &mut self,
 849        chunk: acp::ContentBlock,
 850        is_thought: bool,
 851        cx: &mut Context<Self>,
 852    ) {
 853        let language_registry = self.project.read(cx).languages().clone();
 854        let entries_len = self.entries.len();
 855        if let Some(last_entry) = self.entries.last_mut()
 856            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 857        {
 858            let idx = entries_len - 1;
 859            cx.emit(AcpThreadEvent::EntryUpdated(idx));
 860            match (chunks.last_mut(), is_thought) {
 861                (Some(AssistantMessageChunk::Message { block }), false)
 862                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 863                    block.append(chunk, &language_registry, cx)
 864                }
 865                _ => {
 866                    let block = ContentBlock::new(chunk, &language_registry, cx);
 867                    if is_thought {
 868                        chunks.push(AssistantMessageChunk::Thought { block })
 869                    } else {
 870                        chunks.push(AssistantMessageChunk::Message { block })
 871                    }
 872                }
 873            }
 874        } else {
 875            let block = ContentBlock::new(chunk, &language_registry, cx);
 876            let chunk = if is_thought {
 877                AssistantMessageChunk::Thought { block }
 878            } else {
 879                AssistantMessageChunk::Message { block }
 880            };
 881
 882            self.push_entry(
 883                AgentThreadEntry::AssistantMessage(AssistantMessage {
 884                    chunks: vec![chunk],
 885                }),
 886                cx,
 887            );
 888        }
 889    }
 890
 891    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 892        self.entries.push(entry);
 893        cx.emit(AcpThreadEvent::NewEntry);
 894    }
 895
 896    pub fn update_tool_call(
 897        &mut self,
 898        update: impl Into<ToolCallUpdate>,
 899        cx: &mut Context<Self>,
 900    ) -> Result<()> {
 901        let update = update.into();
 902        let languages = self.project.read(cx).languages().clone();
 903
 904        let (ix, current_call) = self
 905            .tool_call_mut(update.id())
 906            .context("Tool call not found")?;
 907        match update {
 908            ToolCallUpdate::UpdateFields(update) => {
 909                let location_updated = update.fields.locations.is_some();
 910                current_call.update_fields(update.fields, languages, cx);
 911                if location_updated {
 912                    self.resolve_locations(update.id.clone(), cx);
 913                }
 914            }
 915            ToolCallUpdate::UpdateDiff(update) => {
 916                current_call.content.clear();
 917                current_call
 918                    .content
 919                    .push(ToolCallContent::Diff(update.diff));
 920            }
 921            ToolCallUpdate::UpdateTerminal(update) => {
 922                current_call.content.clear();
 923                current_call
 924                    .content
 925                    .push(ToolCallContent::Terminal(update.terminal));
 926            }
 927        }
 928
 929        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 930
 931        Ok(())
 932    }
 933
 934    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
 935    pub fn upsert_tool_call(&mut self, tool_call: acp::ToolCall, cx: &mut Context<Self>) {
 936        let status = ToolCallStatus::Allowed {
 937            status: tool_call.status,
 938        };
 939        self.upsert_tool_call_inner(tool_call, status, cx)
 940    }
 941
 942    pub fn upsert_tool_call_inner(
 943        &mut self,
 944        tool_call: acp::ToolCall,
 945        status: ToolCallStatus,
 946        cx: &mut Context<Self>,
 947    ) {
 948        let language_registry = self.project.read(cx).languages().clone();
 949        let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
 950        let id = call.id.clone();
 951
 952        if let Some((ix, current_call)) = self.tool_call_mut(&call.id) {
 953            *current_call = call;
 954
 955            cx.emit(AcpThreadEvent::EntryUpdated(ix));
 956        } else {
 957            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
 958        };
 959
 960        self.resolve_locations(id, cx);
 961    }
 962
 963    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
 964        // The tool call we are looking for is typically the last one, or very close to the end.
 965        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
 966        self.entries
 967            .iter_mut()
 968            .enumerate()
 969            .rev()
 970            .find_map(|(index, tool_call)| {
 971                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
 972                    && &tool_call.id == id
 973                {
 974                    Some((index, tool_call))
 975                } else {
 976                    None
 977                }
 978            })
 979    }
 980
 981    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
 982        let project = self.project.clone();
 983        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
 984            return;
 985        };
 986        let task = tool_call.resolve_locations(project, cx);
 987        cx.spawn(async move |this, cx| {
 988            let resolved_locations = task.await;
 989            this.update(cx, |this, cx| {
 990                let project = this.project.clone();
 991                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
 992                    return;
 993                };
 994                if let Some(Some(location)) = resolved_locations.last() {
 995                    project.update(cx, |project, cx| {
 996                        if let Some(agent_location) = project.agent_location() {
 997                            let should_ignore = agent_location.buffer == location.buffer
 998                                && location
 999                                    .buffer
1000                                    .update(cx, |buffer, _| {
1001                                        let snapshot = buffer.snapshot();
1002                                        let old_position =
1003                                            agent_location.position.to_point(&snapshot);
1004                                        let new_position = location.position.to_point(&snapshot);
1005                                        // ignore this so that when we get updates from the edit tool
1006                                        // the position doesn't reset to the startof line
1007                                        old_position.row == new_position.row
1008                                            && old_position.column > new_position.column
1009                                    })
1010                                    .ok()
1011                                    .unwrap_or_default();
1012                            if !should_ignore {
1013                                project.set_agent_location(Some(location.clone()), cx);
1014                            }
1015                        }
1016                    });
1017                }
1018                if tool_call.resolved_locations != resolved_locations {
1019                    tool_call.resolved_locations = resolved_locations;
1020                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1021                }
1022            })
1023        })
1024        .detach();
1025    }
1026
1027    pub fn request_tool_call_authorization(
1028        &mut self,
1029        tool_call: acp::ToolCall,
1030        options: Vec<acp::PermissionOption>,
1031        cx: &mut Context<Self>,
1032    ) -> oneshot::Receiver<acp::PermissionOptionId> {
1033        let (tx, rx) = oneshot::channel();
1034
1035        let status = ToolCallStatus::WaitingForConfirmation {
1036            options,
1037            respond_tx: tx,
1038        };
1039
1040        self.upsert_tool_call_inner(tool_call, status, cx);
1041        cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1042        rx
1043    }
1044
1045    pub fn authorize_tool_call(
1046        &mut self,
1047        id: acp::ToolCallId,
1048        option_id: acp::PermissionOptionId,
1049        option_kind: acp::PermissionOptionKind,
1050        cx: &mut Context<Self>,
1051    ) {
1052        let Some((ix, call)) = self.tool_call_mut(&id) else {
1053            return;
1054        };
1055
1056        let new_status = match option_kind {
1057            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1058                ToolCallStatus::Rejected
1059            }
1060            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1061                ToolCallStatus::Allowed {
1062                    status: acp::ToolCallStatus::InProgress,
1063                }
1064            }
1065        };
1066
1067        let curr_status = mem::replace(&mut call.status, new_status);
1068
1069        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1070            respond_tx.send(option_id).log_err();
1071        } else if cfg!(debug_assertions) {
1072            panic!("tried to authorize an already authorized tool call");
1073        }
1074
1075        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1076    }
1077
1078    /// Returns true if the last turn is awaiting tool authorization
1079    pub fn waiting_for_tool_confirmation(&self) -> bool {
1080        for entry in self.entries.iter().rev() {
1081            match &entry {
1082                AgentThreadEntry::ToolCall(call) => match call.status {
1083                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1084                    ToolCallStatus::Allowed { .. }
1085                    | ToolCallStatus::Rejected
1086                    | ToolCallStatus::Canceled => continue,
1087                },
1088                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1089                    // Reached the beginning of the turn
1090                    return false;
1091                }
1092            }
1093        }
1094        false
1095    }
1096
1097    pub fn plan(&self) -> &Plan {
1098        &self.plan
1099    }
1100
1101    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1102        let new_entries_len = request.entries.len();
1103        let mut new_entries = request.entries.into_iter();
1104
1105        // Reuse existing markdown to prevent flickering
1106        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1107            let PlanEntry {
1108                content,
1109                priority,
1110                status,
1111            } = old;
1112            content.update(cx, |old, cx| {
1113                old.replace(new.content, cx);
1114            });
1115            *priority = new.priority;
1116            *status = new.status;
1117        }
1118        for new in new_entries {
1119            self.plan.entries.push(PlanEntry::from_acp(new, cx))
1120        }
1121        self.plan.entries.truncate(new_entries_len);
1122
1123        cx.notify();
1124    }
1125
1126    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1127        self.plan
1128            .entries
1129            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1130        cx.notify();
1131    }
1132
1133    #[cfg(any(test, feature = "test-support"))]
1134    pub fn send_raw(
1135        &mut self,
1136        message: &str,
1137        cx: &mut Context<Self>,
1138    ) -> BoxFuture<'static, Result<()>> {
1139        self.send(
1140            vec![acp::ContentBlock::Text(acp::TextContent {
1141                text: message.to_string(),
1142                annotations: None,
1143            })],
1144            cx,
1145        )
1146    }
1147
1148    pub fn send(
1149        &mut self,
1150        message: Vec<acp::ContentBlock>,
1151        cx: &mut Context<Self>,
1152    ) -> BoxFuture<'static, Result<()>> {
1153        let block = ContentBlock::new_combined(
1154            message.clone(),
1155            self.project.read(cx).languages().clone(),
1156            cx,
1157        );
1158        let request = acp::PromptRequest {
1159            prompt: message.clone(),
1160            session_id: self.session_id.clone(),
1161        };
1162        let git_store = self.project.read(cx).git_store().clone();
1163
1164        let message_id = if self
1165            .connection
1166            .session_editor(&self.session_id, cx)
1167            .is_some()
1168        {
1169            Some(UserMessageId::new())
1170        } else {
1171            None
1172        };
1173        self.push_entry(
1174            AgentThreadEntry::UserMessage(UserMessage {
1175                id: message_id.clone(),
1176                content: block,
1177                chunks: message,
1178                checkpoint: None,
1179            }),
1180            cx,
1181        );
1182
1183        self.run_turn(cx, async move |this, cx| {
1184            let old_checkpoint = git_store
1185                .update(cx, |git, cx| git.checkpoint(cx))?
1186                .await
1187                .context("failed to get old checkpoint")
1188                .log_err();
1189            this.update(cx, |this, cx| {
1190                if let Some((_ix, message)) = this.last_user_message() {
1191                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1192                        git_checkpoint,
1193                        show: false,
1194                    });
1195                }
1196                this.connection.prompt(message_id, request, cx)
1197            })?
1198            .await
1199        })
1200    }
1201
1202    pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1203        self.run_turn(cx, async move |this, cx| {
1204            this.update(cx, |this, cx| {
1205                this.connection
1206                    .resume(&this.session_id, cx)
1207                    .map(|resume| resume.run(cx))
1208            })?
1209            .context("resuming a session is not supported")?
1210            .await
1211        })
1212    }
1213
1214    fn run_turn(
1215        &mut self,
1216        cx: &mut Context<Self>,
1217        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1218    ) -> BoxFuture<'static, Result<()>> {
1219        self.clear_completed_plan_entries(cx);
1220
1221        let (tx, rx) = oneshot::channel();
1222        let cancel_task = self.cancel(cx);
1223
1224        self.send_task = Some(cx.spawn(async move |this, cx| {
1225            cancel_task.await;
1226            tx.send(f(this, cx).await).ok();
1227        }));
1228
1229        cx.spawn(async move |this, cx| {
1230            let response = rx.await;
1231
1232            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1233                .await?;
1234
1235            this.update(cx, |this, cx| {
1236                match response {
1237                    Ok(Err(e)) => {
1238                        this.send_task.take();
1239                        cx.emit(AcpThreadEvent::Error);
1240                        Err(e)
1241                    }
1242                    result => {
1243                        let cancelled = matches!(
1244                            result,
1245                            Ok(Ok(acp::PromptResponse {
1246                                stop_reason: acp::StopReason::Cancelled
1247                            }))
1248                        );
1249
1250                        // We only take the task if the current prompt wasn't cancelled.
1251                        //
1252                        // This prompt may have been cancelled because another one was sent
1253                        // while it was still generating. In these cases, dropping `send_task`
1254                        // would cause the next generation to be cancelled.
1255                        if !cancelled {
1256                            this.send_task.take();
1257                        }
1258
1259                        cx.emit(AcpThreadEvent::Stopped);
1260                        Ok(())
1261                    }
1262                }
1263            })?
1264        })
1265        .boxed()
1266    }
1267
1268    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1269        let Some(send_task) = self.send_task.take() else {
1270            return Task::ready(());
1271        };
1272
1273        for entry in self.entries.iter_mut() {
1274            if let AgentThreadEntry::ToolCall(call) = entry {
1275                let cancel = matches!(
1276                    call.status,
1277                    ToolCallStatus::WaitingForConfirmation { .. }
1278                        | ToolCallStatus::Allowed {
1279                            status: acp::ToolCallStatus::InProgress
1280                        }
1281                );
1282
1283                if cancel {
1284                    call.status = ToolCallStatus::Canceled;
1285                }
1286            }
1287        }
1288
1289        self.connection.cancel(&self.session_id, cx);
1290
1291        // Wait for the send task to complete
1292        cx.foreground_executor().spawn(send_task)
1293    }
1294
1295    /// Rewinds this thread to before the entry at `index`, removing it and all
1296    /// subsequent entries while reverting any changes made from that point.
1297    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1298        let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1299            return Task::ready(Err(anyhow!("not supported")));
1300        };
1301        let Some(message) = self.user_message(&id) else {
1302            return Task::ready(Err(anyhow!("message not found")));
1303        };
1304
1305        let checkpoint = message
1306            .checkpoint
1307            .as_ref()
1308            .map(|c| c.git_checkpoint.clone());
1309
1310        let git_store = self.project.read(cx).git_store().clone();
1311        cx.spawn(async move |this, cx| {
1312            if let Some(checkpoint) = checkpoint {
1313                git_store
1314                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1315                    .await?;
1316            }
1317
1318            cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1319                .await?;
1320            this.update(cx, |this, cx| {
1321                if let Some((ix, _)) = this.user_message_mut(&id) {
1322                    let range = ix..this.entries.len();
1323                    this.entries.truncate(ix);
1324                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
1325                }
1326            })
1327        })
1328    }
1329
1330    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1331        let git_store = self.project.read(cx).git_store().clone();
1332
1333        let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1334            if let Some(checkpoint) = message.checkpoint.as_ref() {
1335                checkpoint.git_checkpoint.clone()
1336            } else {
1337                return Task::ready(Ok(()));
1338            }
1339        } else {
1340            return Task::ready(Ok(()));
1341        };
1342
1343        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1344        cx.spawn(async move |this, cx| {
1345            let new_checkpoint = new_checkpoint
1346                .await
1347                .context("failed to get new checkpoint")
1348                .log_err();
1349            if let Some(new_checkpoint) = new_checkpoint {
1350                let equal = git_store
1351                    .update(cx, |git, cx| {
1352                        git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1353                    })?
1354                    .await
1355                    .unwrap_or(true);
1356                this.update(cx, |this, cx| {
1357                    let (ix, message) = this.last_user_message().context("no user message")?;
1358                    let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1359                    checkpoint.show = !equal;
1360                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1361                    anyhow::Ok(())
1362                })??;
1363            }
1364
1365            Ok(())
1366        })
1367    }
1368
1369    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1370        self.entries
1371            .iter_mut()
1372            .enumerate()
1373            .rev()
1374            .find_map(|(ix, entry)| {
1375                if let AgentThreadEntry::UserMessage(message) = entry {
1376                    Some((ix, message))
1377                } else {
1378                    None
1379                }
1380            })
1381    }
1382
1383    fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1384        self.entries.iter().find_map(|entry| {
1385            if let AgentThreadEntry::UserMessage(message) = entry {
1386                if message.id.as_ref() == Some(&id) {
1387                    Some(message)
1388                } else {
1389                    None
1390                }
1391            } else {
1392                None
1393            }
1394        })
1395    }
1396
1397    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1398        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1399            if let AgentThreadEntry::UserMessage(message) = entry {
1400                if message.id.as_ref() == Some(&id) {
1401                    Some((ix, message))
1402                } else {
1403                    None
1404                }
1405            } else {
1406                None
1407            }
1408        })
1409    }
1410
1411    pub fn read_text_file(
1412        &self,
1413        path: PathBuf,
1414        line: Option<u32>,
1415        limit: Option<u32>,
1416        reuse_shared_snapshot: bool,
1417        cx: &mut Context<Self>,
1418    ) -> Task<Result<String>> {
1419        let project = self.project.clone();
1420        let action_log = self.action_log.clone();
1421        cx.spawn(async move |this, cx| {
1422            let load = project.update(cx, |project, cx| {
1423                let path = project
1424                    .project_path_for_absolute_path(&path, cx)
1425                    .context("invalid path")?;
1426                anyhow::Ok(project.open_buffer(path, cx))
1427            });
1428            let buffer = load??.await?;
1429
1430            let snapshot = if reuse_shared_snapshot {
1431                this.read_with(cx, |this, _| {
1432                    this.shared_buffers.get(&buffer.clone()).cloned()
1433                })
1434                .log_err()
1435                .flatten()
1436            } else {
1437                None
1438            };
1439
1440            let snapshot = if let Some(snapshot) = snapshot {
1441                snapshot
1442            } else {
1443                action_log.update(cx, |action_log, cx| {
1444                    action_log.buffer_read(buffer.clone(), cx);
1445                })?;
1446                project.update(cx, |project, cx| {
1447                    let position = buffer
1448                        .read(cx)
1449                        .snapshot()
1450                        .anchor_before(Point::new(line.unwrap_or_default(), 0));
1451                    project.set_agent_location(
1452                        Some(AgentLocation {
1453                            buffer: buffer.downgrade(),
1454                            position,
1455                        }),
1456                        cx,
1457                    );
1458                })?;
1459
1460                buffer.update(cx, |buffer, _| buffer.snapshot())?
1461            };
1462
1463            this.update(cx, |this, _| {
1464                let text = snapshot.text();
1465                this.shared_buffers.insert(buffer.clone(), snapshot);
1466                if line.is_none() && limit.is_none() {
1467                    return Ok(text);
1468                }
1469                let limit = limit.unwrap_or(u32::MAX) as usize;
1470                let Some(line) = line else {
1471                    return Ok(text.lines().take(limit).collect::<String>());
1472                };
1473
1474                let count = text.lines().count();
1475                if count < line as usize {
1476                    anyhow::bail!("There are only {} lines", count);
1477                }
1478                Ok(text
1479                    .lines()
1480                    .skip(line as usize + 1)
1481                    .take(limit)
1482                    .collect::<String>())
1483            })?
1484        })
1485    }
1486
1487    pub fn write_text_file(
1488        &self,
1489        path: PathBuf,
1490        content: String,
1491        cx: &mut Context<Self>,
1492    ) -> Task<Result<()>> {
1493        let project = self.project.clone();
1494        let action_log = self.action_log.clone();
1495        cx.spawn(async move |this, cx| {
1496            let load = project.update(cx, |project, cx| {
1497                let path = project
1498                    .project_path_for_absolute_path(&path, cx)
1499                    .context("invalid path")?;
1500                anyhow::Ok(project.open_buffer(path, cx))
1501            });
1502            let buffer = load??.await?;
1503            let snapshot = this.update(cx, |this, cx| {
1504                this.shared_buffers
1505                    .get(&buffer)
1506                    .cloned()
1507                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1508            })?;
1509            let edits = cx
1510                .background_executor()
1511                .spawn(async move {
1512                    let old_text = snapshot.text();
1513                    text_diff(old_text.as_str(), &content)
1514                        .into_iter()
1515                        .map(|(range, replacement)| {
1516                            (
1517                                snapshot.anchor_after(range.start)
1518                                    ..snapshot.anchor_before(range.end),
1519                                replacement,
1520                            )
1521                        })
1522                        .collect::<Vec<_>>()
1523                })
1524                .await;
1525            cx.update(|cx| {
1526                project.update(cx, |project, cx| {
1527                    project.set_agent_location(
1528                        Some(AgentLocation {
1529                            buffer: buffer.downgrade(),
1530                            position: edits
1531                                .last()
1532                                .map(|(range, _)| range.end)
1533                                .unwrap_or(Anchor::MIN),
1534                        }),
1535                        cx,
1536                    );
1537                });
1538
1539                action_log.update(cx, |action_log, cx| {
1540                    action_log.buffer_read(buffer.clone(), cx);
1541                });
1542                buffer.update(cx, |buffer, cx| {
1543                    buffer.edit(edits, None, cx);
1544                });
1545                action_log.update(cx, |action_log, cx| {
1546                    action_log.buffer_edited(buffer.clone(), cx);
1547                });
1548            })?;
1549            project
1550                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1551                .await
1552        })
1553    }
1554
1555    pub fn to_markdown(&self, cx: &App) -> String {
1556        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1557    }
1558
1559    pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1560        cx.emit(AcpThreadEvent::ServerExited(status));
1561    }
1562}
1563
1564fn markdown_for_raw_output(
1565    raw_output: &serde_json::Value,
1566    language_registry: &Arc<LanguageRegistry>,
1567    cx: &mut App,
1568) -> Option<Entity<Markdown>> {
1569    match raw_output {
1570        serde_json::Value::Null => None,
1571        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1572            Markdown::new(
1573                value.to_string().into(),
1574                Some(language_registry.clone()),
1575                None,
1576                cx,
1577            )
1578        })),
1579        serde_json::Value::Number(value) => Some(cx.new(|cx| {
1580            Markdown::new(
1581                value.to_string().into(),
1582                Some(language_registry.clone()),
1583                None,
1584                cx,
1585            )
1586        })),
1587        serde_json::Value::String(value) => Some(cx.new(|cx| {
1588            Markdown::new(
1589                value.clone().into(),
1590                Some(language_registry.clone()),
1591                None,
1592                cx,
1593            )
1594        })),
1595        value => Some(cx.new(|cx| {
1596            Markdown::new(
1597                format!("```json\n{}\n```", value).into(),
1598                Some(language_registry.clone()),
1599                None,
1600                cx,
1601            )
1602        })),
1603    }
1604}
1605
1606#[cfg(test)]
1607mod tests {
1608    use super::*;
1609    use anyhow::anyhow;
1610    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1611    use gpui::{AsyncApp, TestAppContext, WeakEntity};
1612    use indoc::indoc;
1613    use project::{FakeFs, Fs};
1614    use rand::Rng as _;
1615    use serde_json::json;
1616    use settings::SettingsStore;
1617    use smol::stream::StreamExt as _;
1618    use std::{
1619        any::Any,
1620        cell::RefCell,
1621        path::Path,
1622        rc::Rc,
1623        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1624        time::Duration,
1625    };
1626    use util::path;
1627
1628    fn init_test(cx: &mut TestAppContext) {
1629        env_logger::try_init().ok();
1630        cx.update(|cx| {
1631            let settings_store = SettingsStore::test(cx);
1632            cx.set_global(settings_store);
1633            Project::init_settings(cx);
1634            language::init(cx);
1635        });
1636    }
1637
1638    #[gpui::test]
1639    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1640        init_test(cx);
1641
1642        let fs = FakeFs::new(cx.executor());
1643        let project = Project::test(fs, [], cx).await;
1644        let connection = Rc::new(FakeAgentConnection::new());
1645        let thread = cx
1646            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1647            .await
1648            .unwrap();
1649
1650        // Test creating a new user message
1651        thread.update(cx, |thread, cx| {
1652            thread.push_user_content_block(
1653                None,
1654                acp::ContentBlock::Text(acp::TextContent {
1655                    annotations: None,
1656                    text: "Hello, ".to_string(),
1657                }),
1658                cx,
1659            );
1660        });
1661
1662        thread.update(cx, |thread, cx| {
1663            assert_eq!(thread.entries.len(), 1);
1664            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1665                assert_eq!(user_msg.id, None);
1666                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1667            } else {
1668                panic!("Expected UserMessage");
1669            }
1670        });
1671
1672        // Test appending to existing user message
1673        let message_1_id = UserMessageId::new();
1674        thread.update(cx, |thread, cx| {
1675            thread.push_user_content_block(
1676                Some(message_1_id.clone()),
1677                acp::ContentBlock::Text(acp::TextContent {
1678                    annotations: None,
1679                    text: "world!".to_string(),
1680                }),
1681                cx,
1682            );
1683        });
1684
1685        thread.update(cx, |thread, cx| {
1686            assert_eq!(thread.entries.len(), 1);
1687            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1688                assert_eq!(user_msg.id, Some(message_1_id));
1689                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1690            } else {
1691                panic!("Expected UserMessage");
1692            }
1693        });
1694
1695        // Test creating new user message after assistant message
1696        thread.update(cx, |thread, cx| {
1697            thread.push_assistant_content_block(
1698                acp::ContentBlock::Text(acp::TextContent {
1699                    annotations: None,
1700                    text: "Assistant response".to_string(),
1701                }),
1702                false,
1703                cx,
1704            );
1705        });
1706
1707        let message_2_id = UserMessageId::new();
1708        thread.update(cx, |thread, cx| {
1709            thread.push_user_content_block(
1710                Some(message_2_id.clone()),
1711                acp::ContentBlock::Text(acp::TextContent {
1712                    annotations: None,
1713                    text: "New user message".to_string(),
1714                }),
1715                cx,
1716            );
1717        });
1718
1719        thread.update(cx, |thread, cx| {
1720            assert_eq!(thread.entries.len(), 3);
1721            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1722                assert_eq!(user_msg.id, Some(message_2_id));
1723                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1724            } else {
1725                panic!("Expected UserMessage at index 2");
1726            }
1727        });
1728    }
1729
1730    #[gpui::test]
1731    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1732        init_test(cx);
1733
1734        let fs = FakeFs::new(cx.executor());
1735        let project = Project::test(fs, [], cx).await;
1736        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1737            |_, thread, mut cx| {
1738                async move {
1739                    thread.update(&mut cx, |thread, cx| {
1740                        thread
1741                            .handle_session_update(
1742                                acp::SessionUpdate::AgentThoughtChunk {
1743                                    content: "Thinking ".into(),
1744                                },
1745                                cx,
1746                            )
1747                            .unwrap();
1748                        thread
1749                            .handle_session_update(
1750                                acp::SessionUpdate::AgentThoughtChunk {
1751                                    content: "hard!".into(),
1752                                },
1753                                cx,
1754                            )
1755                            .unwrap();
1756                    })?;
1757                    Ok(acp::PromptResponse {
1758                        stop_reason: acp::StopReason::EndTurn,
1759                    })
1760                }
1761                .boxed_local()
1762            },
1763        ));
1764
1765        let thread = cx
1766            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1767            .await
1768            .unwrap();
1769
1770        thread
1771            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1772            .await
1773            .unwrap();
1774
1775        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1776        assert_eq!(
1777            output,
1778            indoc! {r#"
1779            ## User
1780
1781            Hello from Zed!
1782
1783            ## Assistant
1784
1785            <thinking>
1786            Thinking hard!
1787            </thinking>
1788
1789            "#}
1790        );
1791    }
1792
1793    #[gpui::test]
1794    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1795        init_test(cx);
1796
1797        let fs = FakeFs::new(cx.executor());
1798        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1799            .await;
1800        let project = Project::test(fs.clone(), [], cx).await;
1801        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1802        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1803        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1804            move |_, thread, mut cx| {
1805                let read_file_tx = read_file_tx.clone();
1806                async move {
1807                    let content = thread
1808                        .update(&mut cx, |thread, cx| {
1809                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1810                        })
1811                        .unwrap()
1812                        .await
1813                        .unwrap();
1814                    assert_eq!(content, "one\ntwo\nthree\n");
1815                    read_file_tx.take().unwrap().send(()).unwrap();
1816                    thread
1817                        .update(&mut cx, |thread, cx| {
1818                            thread.write_text_file(
1819                                path!("/tmp/foo").into(),
1820                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
1821                                cx,
1822                            )
1823                        })
1824                        .unwrap()
1825                        .await
1826                        .unwrap();
1827                    Ok(acp::PromptResponse {
1828                        stop_reason: acp::StopReason::EndTurn,
1829                    })
1830                }
1831                .boxed_local()
1832            },
1833        ));
1834
1835        let (worktree, pathbuf) = project
1836            .update(cx, |project, cx| {
1837                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1838            })
1839            .await
1840            .unwrap();
1841        let buffer = project
1842            .update(cx, |project, cx| {
1843                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1844            })
1845            .await
1846            .unwrap();
1847
1848        let thread = cx
1849            .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1850            .await
1851            .unwrap();
1852
1853        let request = thread.update(cx, |thread, cx| {
1854            thread.send_raw("Extend the count in /tmp/foo", cx)
1855        });
1856        read_file_rx.await.ok();
1857        buffer.update(cx, |buffer, cx| {
1858            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1859        });
1860        cx.run_until_parked();
1861        assert_eq!(
1862            buffer.read_with(cx, |buffer, _| buffer.text()),
1863            "zero\none\ntwo\nthree\nfour\nfive\n"
1864        );
1865        assert_eq!(
1866            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1867            "zero\none\ntwo\nthree\nfour\nfive\n"
1868        );
1869        request.await.unwrap();
1870    }
1871
1872    #[gpui::test]
1873    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1874        init_test(cx);
1875
1876        let fs = FakeFs::new(cx.executor());
1877        let project = Project::test(fs, [], cx).await;
1878        let id = acp::ToolCallId("test".into());
1879
1880        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1881            let id = id.clone();
1882            move |_, thread, mut cx| {
1883                let id = id.clone();
1884                async move {
1885                    thread
1886                        .update(&mut cx, |thread, cx| {
1887                            thread.handle_session_update(
1888                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1889                                    id: id.clone(),
1890                                    title: "Label".into(),
1891                                    kind: acp::ToolKind::Fetch,
1892                                    status: acp::ToolCallStatus::InProgress,
1893                                    content: vec![],
1894                                    locations: vec![],
1895                                    raw_input: None,
1896                                    raw_output: None,
1897                                }),
1898                                cx,
1899                            )
1900                        })
1901                        .unwrap()
1902                        .unwrap();
1903                    Ok(acp::PromptResponse {
1904                        stop_reason: acp::StopReason::EndTurn,
1905                    })
1906                }
1907                .boxed_local()
1908            }
1909        }));
1910
1911        let thread = cx
1912            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1913            .await
1914            .unwrap();
1915
1916        let request = thread.update(cx, |thread, cx| {
1917            thread.send_raw("Fetch https://example.com", cx)
1918        });
1919
1920        run_until_first_tool_call(&thread, cx).await;
1921
1922        thread.read_with(cx, |thread, _| {
1923            assert!(matches!(
1924                thread.entries[1],
1925                AgentThreadEntry::ToolCall(ToolCall {
1926                    status: ToolCallStatus::Allowed {
1927                        status: acp::ToolCallStatus::InProgress,
1928                        ..
1929                    },
1930                    ..
1931                })
1932            ));
1933        });
1934
1935        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1936
1937        thread.read_with(cx, |thread, _| {
1938            assert!(matches!(
1939                &thread.entries[1],
1940                AgentThreadEntry::ToolCall(ToolCall {
1941                    status: ToolCallStatus::Canceled,
1942                    ..
1943                })
1944            ));
1945        });
1946
1947        thread
1948            .update(cx, |thread, cx| {
1949                thread.handle_session_update(
1950                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
1951                        id,
1952                        fields: acp::ToolCallUpdateFields {
1953                            status: Some(acp::ToolCallStatus::Completed),
1954                            ..Default::default()
1955                        },
1956                    }),
1957                    cx,
1958                )
1959            })
1960            .unwrap();
1961
1962        request.await.unwrap();
1963
1964        thread.read_with(cx, |thread, _| {
1965            assert!(matches!(
1966                thread.entries[1],
1967                AgentThreadEntry::ToolCall(ToolCall {
1968                    status: ToolCallStatus::Allowed {
1969                        status: acp::ToolCallStatus::Completed,
1970                        ..
1971                    },
1972                    ..
1973                })
1974            ));
1975        });
1976    }
1977
1978    #[gpui::test]
1979    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
1980        init_test(cx);
1981        let fs = FakeFs::new(cx.background_executor.clone());
1982        fs.insert_tree(path!("/test"), json!({})).await;
1983        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
1984
1985        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1986            move |_, thread, mut cx| {
1987                async move {
1988                    thread
1989                        .update(&mut cx, |thread, cx| {
1990                            thread.handle_session_update(
1991                                acp::SessionUpdate::ToolCall(acp::ToolCall {
1992                                    id: acp::ToolCallId("test".into()),
1993                                    title: "Label".into(),
1994                                    kind: acp::ToolKind::Edit,
1995                                    status: acp::ToolCallStatus::Completed,
1996                                    content: vec![acp::ToolCallContent::Diff {
1997                                        diff: acp::Diff {
1998                                            path: "/test/test.txt".into(),
1999                                            old_text: None,
2000                                            new_text: "foo".into(),
2001                                        },
2002                                    }],
2003                                    locations: vec![],
2004                                    raw_input: None,
2005                                    raw_output: None,
2006                                }),
2007                                cx,
2008                            )
2009                        })
2010                        .unwrap()
2011                        .unwrap();
2012                    Ok(acp::PromptResponse {
2013                        stop_reason: acp::StopReason::EndTurn,
2014                    })
2015                }
2016                .boxed_local()
2017            }
2018        }));
2019
2020        let thread = cx
2021            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2022            .await
2023            .unwrap();
2024
2025        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2026            .await
2027            .unwrap();
2028
2029        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2030    }
2031
2032    #[gpui::test(iterations = 10)]
2033    async fn test_checkpoints(cx: &mut TestAppContext) {
2034        init_test(cx);
2035        let fs = FakeFs::new(cx.background_executor.clone());
2036        fs.insert_tree(
2037            path!("/test"),
2038            json!({
2039                ".git": {}
2040            }),
2041        )
2042        .await;
2043        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2044
2045        let simulate_changes = Arc::new(AtomicBool::new(true));
2046        let next_filename = Arc::new(AtomicUsize::new(0));
2047        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2048            let simulate_changes = simulate_changes.clone();
2049            let next_filename = next_filename.clone();
2050            let fs = fs.clone();
2051            move |request, thread, mut cx| {
2052                let fs = fs.clone();
2053                let simulate_changes = simulate_changes.clone();
2054                let next_filename = next_filename.clone();
2055                async move {
2056                    if simulate_changes.load(SeqCst) {
2057                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2058                        fs.write(Path::new(&filename), b"").await?;
2059                    }
2060
2061                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2062                        panic!("expected text content block");
2063                    };
2064                    thread.update(&mut cx, |thread, cx| {
2065                        thread
2066                            .handle_session_update(
2067                                acp::SessionUpdate::AgentMessageChunk {
2068                                    content: content.text.to_uppercase().into(),
2069                                },
2070                                cx,
2071                            )
2072                            .unwrap();
2073                    })?;
2074                    Ok(acp::PromptResponse {
2075                        stop_reason: acp::StopReason::EndTurn,
2076                    })
2077                }
2078                .boxed_local()
2079            }
2080        }));
2081        let thread = cx
2082            .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2083            .await
2084            .unwrap();
2085
2086        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2087            .await
2088            .unwrap();
2089        thread.read_with(cx, |thread, cx| {
2090            assert_eq!(
2091                thread.to_markdown(cx),
2092                indoc! {"
2093                    ## User (checkpoint)
2094
2095                    Lorem
2096
2097                    ## Assistant
2098
2099                    LOREM
2100
2101                "}
2102            );
2103        });
2104        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2105
2106        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2107            .await
2108            .unwrap();
2109        thread.read_with(cx, |thread, cx| {
2110            assert_eq!(
2111                thread.to_markdown(cx),
2112                indoc! {"
2113                    ## User (checkpoint)
2114
2115                    Lorem
2116
2117                    ## Assistant
2118
2119                    LOREM
2120
2121                    ## User (checkpoint)
2122
2123                    ipsum
2124
2125                    ## Assistant
2126
2127                    IPSUM
2128
2129                "}
2130            );
2131        });
2132        assert_eq!(
2133            fs.files(),
2134            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2135        );
2136
2137        // Checkpoint isn't stored when there are no changes.
2138        simulate_changes.store(false, SeqCst);
2139        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2140            .await
2141            .unwrap();
2142        thread.read_with(cx, |thread, cx| {
2143            assert_eq!(
2144                thread.to_markdown(cx),
2145                indoc! {"
2146                    ## User (checkpoint)
2147
2148                    Lorem
2149
2150                    ## Assistant
2151
2152                    LOREM
2153
2154                    ## User (checkpoint)
2155
2156                    ipsum
2157
2158                    ## Assistant
2159
2160                    IPSUM
2161
2162                    ## User
2163
2164                    dolor
2165
2166                    ## Assistant
2167
2168                    DOLOR
2169
2170                "}
2171            );
2172        });
2173        assert_eq!(
2174            fs.files(),
2175            vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2176        );
2177
2178        // Rewinding the conversation truncates the history and restores the checkpoint.
2179        thread
2180            .update(cx, |thread, cx| {
2181                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2182                    panic!("unexpected entries {:?}", thread.entries)
2183                };
2184                thread.rewind(message.id.clone().unwrap(), cx)
2185            })
2186            .await
2187            .unwrap();
2188        thread.read_with(cx, |thread, cx| {
2189            assert_eq!(
2190                thread.to_markdown(cx),
2191                indoc! {"
2192                    ## User (checkpoint)
2193
2194                    Lorem
2195
2196                    ## Assistant
2197
2198                    LOREM
2199
2200                "}
2201            );
2202        });
2203        assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2204    }
2205
2206    async fn run_until_first_tool_call(
2207        thread: &Entity<AcpThread>,
2208        cx: &mut TestAppContext,
2209    ) -> usize {
2210        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2211
2212        let subscription = cx.update(|cx| {
2213            cx.subscribe(thread, move |thread, _, cx| {
2214                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2215                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2216                        return tx.try_send(ix).unwrap();
2217                    }
2218                }
2219            })
2220        });
2221
2222        select! {
2223            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2224                panic!("Timeout waiting for tool call")
2225            }
2226            ix = rx.next().fuse() => {
2227                drop(subscription);
2228                ix.unwrap()
2229            }
2230        }
2231    }
2232
2233    #[derive(Clone, Default)]
2234    struct FakeAgentConnection {
2235        auth_methods: Vec<acp::AuthMethod>,
2236        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2237        on_user_message: Option<
2238            Rc<
2239                dyn Fn(
2240                        acp::PromptRequest,
2241                        WeakEntity<AcpThread>,
2242                        AsyncApp,
2243                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2244                    + 'static,
2245            >,
2246        >,
2247    }
2248
2249    impl FakeAgentConnection {
2250        fn new() -> Self {
2251            Self {
2252                auth_methods: Vec::new(),
2253                on_user_message: None,
2254                sessions: Arc::default(),
2255            }
2256        }
2257
2258        #[expect(unused)]
2259        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2260            self.auth_methods = auth_methods;
2261            self
2262        }
2263
2264        fn on_user_message(
2265            mut self,
2266            handler: impl Fn(
2267                acp::PromptRequest,
2268                WeakEntity<AcpThread>,
2269                AsyncApp,
2270            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2271            + 'static,
2272        ) -> Self {
2273            self.on_user_message.replace(Rc::new(handler));
2274            self
2275        }
2276    }
2277
2278    impl AgentConnection for FakeAgentConnection {
2279        fn auth_methods(&self) -> &[acp::AuthMethod] {
2280            &self.auth_methods
2281        }
2282
2283        fn new_thread(
2284            self: Rc<Self>,
2285            project: Entity<Project>,
2286            _cwd: &Path,
2287            cx: &mut gpui::App,
2288        ) -> Task<gpui::Result<Entity<AcpThread>>> {
2289            let session_id = acp::SessionId(
2290                rand::thread_rng()
2291                    .sample_iter(&rand::distributions::Alphanumeric)
2292                    .take(7)
2293                    .map(char::from)
2294                    .collect::<String>()
2295                    .into(),
2296            );
2297            let thread =
2298                cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2299            self.sessions.lock().insert(session_id, thread.downgrade());
2300            Task::ready(Ok(thread))
2301        }
2302
2303        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2304            if self.auth_methods().iter().any(|m| m.id == method) {
2305                Task::ready(Ok(()))
2306            } else {
2307                Task::ready(Err(anyhow!("Invalid Auth Method")))
2308            }
2309        }
2310
2311        fn prompt(
2312            &self,
2313            _id: Option<UserMessageId>,
2314            params: acp::PromptRequest,
2315            cx: &mut App,
2316        ) -> Task<gpui::Result<acp::PromptResponse>> {
2317            let sessions = self.sessions.lock();
2318            let thread = sessions.get(&params.session_id).unwrap();
2319            if let Some(handler) = &self.on_user_message {
2320                let handler = handler.clone();
2321                let thread = thread.clone();
2322                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2323            } else {
2324                Task::ready(Ok(acp::PromptResponse {
2325                    stop_reason: acp::StopReason::EndTurn,
2326                }))
2327            }
2328        }
2329
2330        fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2331            let sessions = self.sessions.lock();
2332            let thread = sessions.get(&session_id).unwrap().clone();
2333
2334            cx.spawn(async move |cx| {
2335                thread
2336                    .update(cx, |thread, cx| thread.cancel(cx))
2337                    .unwrap()
2338                    .await
2339            })
2340            .detach();
2341        }
2342
2343        fn session_editor(
2344            &self,
2345            session_id: &acp::SessionId,
2346            _cx: &mut App,
2347        ) -> Option<Rc<dyn AgentSessionEditor>> {
2348            Some(Rc::new(FakeAgentSessionEditor {
2349                _session_id: session_id.clone(),
2350            }))
2351        }
2352
2353        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2354            self
2355        }
2356    }
2357
2358    struct FakeAgentSessionEditor {
2359        _session_id: acp::SessionId,
2360    }
2361
2362    impl AgentSessionEditor for FakeAgentSessionEditor {
2363        fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2364            Task::ready(Ok(()))
2365        }
2366    }
2367}