acp.rs

   1pub use acp::ToolCallId;
   2use agent_servers::AgentServer;
   3use agentic_coding_protocol::{self as acp, UserMessageChunk};
   4use anyhow::{Context as _, Result, anyhow};
   5use buffer_diff::BufferDiff;
   6use editor::{MultiBuffer, PathKey};
   7use futures::{FutureExt, channel::oneshot, future::BoxFuture};
   8use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
   9use itertools::Itertools;
  10use language::{Anchor, Buffer, Capability, LanguageRegistry, OffsetRangeExt as _};
  11use markdown::Markdown;
  12use project::Project;
  13use std::error::Error;
  14use std::fmt::{Formatter, Write};
  15use std::{
  16    fmt::Display,
  17    mem,
  18    path::{Path, PathBuf},
  19    sync::Arc,
  20};
  21use ui::{App, IconName};
  22use util::ResultExt;
  23
  24#[derive(Clone, Debug, Eq, PartialEq)]
  25pub struct UserMessage {
  26    pub content: Entity<Markdown>,
  27}
  28
  29impl UserMessage {
  30    pub fn from_acp(
  31        message: &acp::SendUserMessageParams,
  32        language_registry: Arc<LanguageRegistry>,
  33        cx: &mut App,
  34    ) -> Self {
  35        let mut md_source = String::new();
  36
  37        for chunk in &message.chunks {
  38            match chunk {
  39                UserMessageChunk::Text { text } => md_source.push_str(&text),
  40                UserMessageChunk::Path { path } => {
  41                    write!(&mut md_source, "{}", MentionPath(&path)).unwrap()
  42                }
  43            }
  44        }
  45
  46        Self {
  47            content: cx
  48                .new(|cx| Markdown::new(md_source.into(), Some(language_registry), None, cx)),
  49        }
  50    }
  51
  52    fn to_markdown(&self, cx: &App) -> String {
  53        format!("## User\n\n{}\n\n", self.content.read(cx).source())
  54    }
  55}
  56
  57#[derive(Debug)]
  58pub struct MentionPath<'a>(&'a Path);
  59
  60impl<'a> MentionPath<'a> {
  61    const PREFIX: &'static str = "@file:";
  62
  63    pub fn new(path: &'a Path) -> Self {
  64        MentionPath(path)
  65    }
  66
  67    pub fn try_parse(url: &'a str) -> Option<Self> {
  68        let path = url.strip_prefix(Self::PREFIX)?;
  69        Some(MentionPath(Path::new(path)))
  70    }
  71
  72    pub fn path(&self) -> &Path {
  73        self.0
  74    }
  75}
  76
  77impl Display for MentionPath<'_> {
  78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  79        write!(
  80            f,
  81            "[@{}]({}{})",
  82            self.0.file_name().unwrap_or_default().display(),
  83            Self::PREFIX,
  84            self.0.display()
  85        )
  86    }
  87}
  88
  89#[derive(Clone, Debug, Eq, PartialEq)]
  90pub struct AssistantMessage {
  91    pub chunks: Vec<AssistantMessageChunk>,
  92}
  93
  94impl AssistantMessage {
  95    fn to_markdown(&self, cx: &App) -> String {
  96        format!(
  97            "## Assistant\n\n{}\n\n",
  98            self.chunks
  99                .iter()
 100                .map(|chunk| chunk.to_markdown(cx))
 101                .join("\n\n")
 102        )
 103    }
 104}
 105
 106#[derive(Clone, Debug, Eq, PartialEq)]
 107pub enum AssistantMessageChunk {
 108    Text { chunk: Entity<Markdown> },
 109    Thought { chunk: Entity<Markdown> },
 110}
 111
 112impl AssistantMessageChunk {
 113    pub fn from_acp(
 114        chunk: acp::AssistantMessageChunk,
 115        language_registry: Arc<LanguageRegistry>,
 116        cx: &mut App,
 117    ) -> Self {
 118        match chunk {
 119            acp::AssistantMessageChunk::Text { text } => Self::Text {
 120                chunk: cx.new(|cx| Markdown::new(text.into(), Some(language_registry), None, cx)),
 121            },
 122            acp::AssistantMessageChunk::Thought { thought } => Self::Thought {
 123                chunk: cx
 124                    .new(|cx| Markdown::new(thought.into(), Some(language_registry), None, cx)),
 125            },
 126        }
 127    }
 128
 129    pub fn from_str(chunk: &str, language_registry: Arc<LanguageRegistry>, cx: &mut App) -> Self {
 130        Self::Text {
 131            chunk: cx.new(|cx| {
 132                Markdown::new(chunk.to_owned().into(), Some(language_registry), None, cx)
 133            }),
 134        }
 135    }
 136
 137    fn to_markdown(&self, cx: &App) -> String {
 138        match self {
 139            Self::Text { chunk } => chunk.read(cx).source().to_string(),
 140            Self::Thought { chunk } => {
 141                format!("<thinking>\n{}\n</thinking>", chunk.read(cx).source())
 142            }
 143        }
 144    }
 145}
 146
 147#[derive(Debug)]
 148pub enum AgentThreadEntry {
 149    UserMessage(UserMessage),
 150    AssistantMessage(AssistantMessage),
 151    ToolCall(ToolCall),
 152}
 153
 154impl AgentThreadEntry {
 155    fn to_markdown(&self, cx: &App) -> String {
 156        match self {
 157            Self::UserMessage(message) => message.to_markdown(cx),
 158            Self::AssistantMessage(message) => message.to_markdown(cx),
 159            Self::ToolCall(too_call) => too_call.to_markdown(cx),
 160        }
 161    }
 162}
 163
 164#[derive(Debug)]
 165pub struct ToolCall {
 166    pub id: acp::ToolCallId,
 167    pub label: Entity<Markdown>,
 168    pub icon: IconName,
 169    pub content: Option<ToolCallContent>,
 170    pub status: ToolCallStatus,
 171}
 172
 173impl ToolCall {
 174    fn to_markdown(&self, cx: &App) -> String {
 175        let mut markdown = format!(
 176            "**Tool Call: {}**\nStatus: {}\n\n",
 177            self.label.read(cx).source(),
 178            self.status
 179        );
 180        if let Some(content) = &self.content {
 181            markdown.push_str(content.to_markdown(cx).as_str());
 182            markdown.push_str("\n\n");
 183        }
 184        markdown
 185    }
 186}
 187
 188#[derive(Debug)]
 189pub enum ToolCallStatus {
 190    WaitingForConfirmation {
 191        confirmation: ToolCallConfirmation,
 192        respond_tx: oneshot::Sender<acp::ToolCallConfirmationOutcome>,
 193    },
 194    Allowed {
 195        status: acp::ToolCallStatus,
 196    },
 197    Rejected,
 198    Canceled,
 199}
 200
 201impl Display for ToolCallStatus {
 202    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 203        write!(
 204            f,
 205            "{}",
 206            match self {
 207                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 208                ToolCallStatus::Allowed { status } => match status {
 209                    acp::ToolCallStatus::Running => "Running",
 210                    acp::ToolCallStatus::Finished => "Finished",
 211                    acp::ToolCallStatus::Error => "Error",
 212                },
 213                ToolCallStatus::Rejected => "Rejected",
 214                ToolCallStatus::Canceled => "Canceled",
 215            }
 216        )
 217    }
 218}
 219
 220#[derive(Debug)]
 221pub enum ToolCallConfirmation {
 222    Edit {
 223        description: Option<Entity<Markdown>>,
 224    },
 225    Execute {
 226        command: String,
 227        root_command: String,
 228        description: Option<Entity<Markdown>>,
 229    },
 230    Mcp {
 231        server_name: String,
 232        tool_name: String,
 233        tool_display_name: String,
 234        description: Option<Entity<Markdown>>,
 235    },
 236    Fetch {
 237        urls: Vec<SharedString>,
 238        description: Option<Entity<Markdown>>,
 239    },
 240    Other {
 241        description: Entity<Markdown>,
 242    },
 243}
 244
 245impl ToolCallConfirmation {
 246    pub fn from_acp(
 247        confirmation: acp::ToolCallConfirmation,
 248        language_registry: Arc<LanguageRegistry>,
 249        cx: &mut App,
 250    ) -> Self {
 251        let to_md = |description: String, cx: &mut App| -> Entity<Markdown> {
 252            cx.new(|cx| {
 253                Markdown::new(
 254                    description.into(),
 255                    Some(language_registry.clone()),
 256                    None,
 257                    cx,
 258                )
 259            })
 260        };
 261
 262        match confirmation {
 263            acp::ToolCallConfirmation::Edit { description } => Self::Edit {
 264                description: description.map(|description| to_md(description, cx)),
 265            },
 266            acp::ToolCallConfirmation::Execute {
 267                command,
 268                root_command,
 269                description,
 270            } => Self::Execute {
 271                command,
 272                root_command,
 273                description: description.map(|description| to_md(description, cx)),
 274            },
 275            acp::ToolCallConfirmation::Mcp {
 276                server_name,
 277                tool_name,
 278                tool_display_name,
 279                description,
 280            } => Self::Mcp {
 281                server_name,
 282                tool_name,
 283                tool_display_name,
 284                description: description.map(|description| to_md(description, cx)),
 285            },
 286            acp::ToolCallConfirmation::Fetch { urls, description } => Self::Fetch {
 287                urls: urls.iter().map(|url| url.into()).collect(),
 288                description: description.map(|description| to_md(description, cx)),
 289            },
 290            acp::ToolCallConfirmation::Other { description } => Self::Other {
 291                description: to_md(description, cx),
 292            },
 293        }
 294    }
 295}
 296
 297#[derive(Debug)]
 298pub enum ToolCallContent {
 299    Markdown { markdown: Entity<Markdown> },
 300    Diff { diff: Diff },
 301}
 302
 303impl ToolCallContent {
 304    pub fn from_acp(
 305        content: acp::ToolCallContent,
 306        language_registry: Arc<LanguageRegistry>,
 307        cx: &mut App,
 308    ) -> Self {
 309        match content {
 310            acp::ToolCallContent::Markdown { markdown } => Self::Markdown {
 311                markdown: cx.new(|cx| Markdown::new_text(markdown.into(), cx)),
 312            },
 313            acp::ToolCallContent::Diff { diff } => Self::Diff {
 314                diff: Diff::from_acp(diff, language_registry, cx),
 315            },
 316        }
 317    }
 318
 319    fn to_markdown(&self, cx: &App) -> String {
 320        match self {
 321            Self::Markdown { markdown } => markdown.read(cx).source().to_string(),
 322            Self::Diff { diff } => diff.to_markdown(cx),
 323        }
 324    }
 325}
 326
 327#[derive(Debug)]
 328pub struct Diff {
 329    pub multibuffer: Entity<MultiBuffer>,
 330    pub path: PathBuf,
 331    _task: Task<Result<()>>,
 332}
 333
 334impl Diff {
 335    pub fn from_acp(
 336        diff: acp::Diff,
 337        language_registry: Arc<LanguageRegistry>,
 338        cx: &mut App,
 339    ) -> Self {
 340        let acp::Diff {
 341            path,
 342            old_text,
 343            new_text,
 344        } = diff;
 345
 346        let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
 347
 348        let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
 349        let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
 350        let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
 351        let old_buffer_snapshot = old_buffer.read(cx).snapshot();
 352        let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
 353        let diff_task = buffer_diff.update(cx, |diff, cx| {
 354            diff.set_base_text(
 355                old_buffer_snapshot,
 356                Some(language_registry.clone()),
 357                new_buffer_snapshot,
 358                cx,
 359            )
 360        });
 361
 362        let task = cx.spawn({
 363            let multibuffer = multibuffer.clone();
 364            let path = path.clone();
 365            async move |cx| {
 366                diff_task.await?;
 367
 368                multibuffer
 369                    .update(cx, |multibuffer, cx| {
 370                        let hunk_ranges = {
 371                            let buffer = new_buffer.read(cx);
 372                            let diff = buffer_diff.read(cx);
 373                            diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
 374                                .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
 375                                .collect::<Vec<_>>()
 376                        };
 377
 378                        multibuffer.set_excerpts_for_path(
 379                            PathKey::for_buffer(&new_buffer, cx),
 380                            new_buffer.clone(),
 381                            hunk_ranges,
 382                            editor::DEFAULT_MULTIBUFFER_CONTEXT,
 383                            cx,
 384                        );
 385                        multibuffer.add_diff(buffer_diff.clone(), cx);
 386                    })
 387                    .log_err();
 388
 389                if let Some(language) = language_registry
 390                    .language_for_file_path(&path)
 391                    .await
 392                    .log_err()
 393                {
 394                    new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
 395                }
 396
 397                anyhow::Ok(())
 398            }
 399        });
 400
 401        Self {
 402            multibuffer,
 403            path,
 404            _task: task,
 405        }
 406    }
 407
 408    fn to_markdown(&self, cx: &App) -> String {
 409        let buffer_text = self
 410            .multibuffer
 411            .read(cx)
 412            .all_buffers()
 413            .iter()
 414            .map(|buffer| buffer.read(cx).text())
 415            .join("\n");
 416        format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
 417    }
 418}
 419
 420pub struct AcpThread {
 421    entries: Vec<AgentThreadEntry>,
 422    title: SharedString,
 423    project: Entity<Project>,
 424    send_task: Option<Task<()>>,
 425    connection: Arc<acp::AgentConnection>,
 426    child_status: Option<Task<Result<()>>>,
 427    _io_task: Task<()>,
 428}
 429
 430pub enum AcpThreadEvent {
 431    NewEntry,
 432    EntryUpdated(usize),
 433}
 434
 435impl EventEmitter<AcpThreadEvent> for AcpThread {}
 436
 437#[derive(PartialEq, Eq)]
 438pub enum ThreadStatus {
 439    Idle,
 440    WaitingForToolConfirmation,
 441    Generating,
 442}
 443
 444#[derive(Debug, Clone)]
 445pub enum LoadError {
 446    Unsupported { current_version: SharedString },
 447    Exited(i32),
 448    Other(SharedString),
 449}
 450
 451impl Display for LoadError {
 452    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 453        match self {
 454            LoadError::Unsupported { current_version } => {
 455                write!(
 456                    f,
 457                    "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).",
 458                    current_version
 459                )
 460            }
 461            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 462            LoadError::Other(msg) => write!(f, "{}", msg),
 463        }
 464    }
 465}
 466
 467impl Error for LoadError {}
 468
 469impl AcpThread {
 470    pub async fn spawn(
 471        server: impl AgentServer + 'static,
 472        root_dir: &Path,
 473        project: Entity<Project>,
 474        cx: &mut AsyncApp,
 475    ) -> Result<Entity<Self>> {
 476        let command = match server.command(&project, cx).await {
 477            Ok(command) => command,
 478            Err(e) => return Err(anyhow!(LoadError::Other(format!("{e}").into()))),
 479        };
 480
 481        let mut child = util::command::new_smol_command(&command.path)
 482            .args(command.args.iter())
 483            .current_dir(root_dir)
 484            .stdin(std::process::Stdio::piped())
 485            .stdout(std::process::Stdio::piped())
 486            .stderr(std::process::Stdio::inherit())
 487            .kill_on_drop(true)
 488            .spawn()?;
 489
 490        let stdin = child.stdin.take().unwrap();
 491        let stdout = child.stdout.take().unwrap();
 492
 493        cx.new(|cx| {
 494            let foreground_executor = cx.foreground_executor().clone();
 495
 496            let (connection, io_fut) = acp::AgentConnection::connect_to_agent(
 497                AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
 498                stdin,
 499                stdout,
 500                move |fut| foreground_executor.spawn(fut).detach(),
 501            );
 502
 503            let io_task = cx.background_spawn(async move {
 504                io_fut.await.log_err();
 505            });
 506
 507            let child_status = cx.background_spawn(async move {
 508                match child.status().await {
 509                    Err(e) => Err(anyhow!(e)),
 510                    Ok(result) if result.success() => Ok(()),
 511                    Ok(result) => {
 512                        if let Some(version) = server.version(&command).await.log_err()
 513                            && !version.supported
 514                        {
 515                            Err(anyhow!(LoadError::Unsupported {
 516                                current_version: version.current_version
 517                            }))
 518                        } else {
 519                            Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127))))
 520                        }
 521                    }
 522                }
 523            });
 524
 525            Self {
 526                entries: Default::default(),
 527                title: "ACP Thread".into(),
 528                project,
 529                send_task: None,
 530                connection: Arc::new(connection),
 531                child_status: Some(child_status),
 532                _io_task: io_task,
 533            }
 534        })
 535    }
 536
 537    #[cfg(test)]
 538    pub fn fake(
 539        stdin: async_pipe::PipeWriter,
 540        stdout: async_pipe::PipeReader,
 541        project: Entity<Project>,
 542        cx: &mut Context<Self>,
 543    ) -> Self {
 544        let foreground_executor = cx.foreground_executor().clone();
 545
 546        let (connection, io_fut) = acp::AgentConnection::connect_to_agent(
 547            AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
 548            stdin,
 549            stdout,
 550            move |fut| {
 551                foreground_executor.spawn(fut).detach();
 552            },
 553        );
 554
 555        let io_task = cx.background_spawn({
 556            async move {
 557                io_fut.await.log_err();
 558            }
 559        });
 560
 561        Self {
 562            entries: Default::default(),
 563            title: "ACP Thread".into(),
 564            project,
 565            send_task: None,
 566            connection: Arc::new(connection),
 567            child_status: None,
 568            _io_task: io_task,
 569        }
 570    }
 571
 572    pub fn title(&self) -> SharedString {
 573        self.title.clone()
 574    }
 575
 576    pub fn entries(&self) -> &[AgentThreadEntry] {
 577        &self.entries
 578    }
 579
 580    pub fn status(&self) -> ThreadStatus {
 581        if self.send_task.is_some() {
 582            if self.waiting_for_tool_confirmation() {
 583                ThreadStatus::WaitingForToolConfirmation
 584            } else {
 585                ThreadStatus::Generating
 586            }
 587        } else {
 588            ThreadStatus::Idle
 589        }
 590    }
 591
 592    pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 593        self.entries.push(entry);
 594        cx.emit(AcpThreadEvent::NewEntry);
 595    }
 596
 597    pub fn push_assistant_chunk(
 598        &mut self,
 599        chunk: acp::AssistantMessageChunk,
 600        cx: &mut Context<Self>,
 601    ) {
 602        let entries_len = self.entries.len();
 603        if let Some(last_entry) = self.entries.last_mut()
 604            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 605        {
 606            cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
 607
 608            match (chunks.last_mut(), &chunk) {
 609                (
 610                    Some(AssistantMessageChunk::Text { chunk: old_chunk }),
 611                    acp::AssistantMessageChunk::Text { text: new_chunk },
 612                )
 613                | (
 614                    Some(AssistantMessageChunk::Thought { chunk: old_chunk }),
 615                    acp::AssistantMessageChunk::Thought { thought: new_chunk },
 616                ) => {
 617                    old_chunk.update(cx, |old_chunk, cx| {
 618                        old_chunk.append(&new_chunk, cx);
 619                    });
 620                }
 621                _ => {
 622                    chunks.push(AssistantMessageChunk::from_acp(
 623                        chunk,
 624                        self.project.read(cx).languages().clone(),
 625                        cx,
 626                    ));
 627                }
 628            }
 629        } else {
 630            let chunk = AssistantMessageChunk::from_acp(
 631                chunk,
 632                self.project.read(cx).languages().clone(),
 633                cx,
 634            );
 635
 636            self.push_entry(
 637                AgentThreadEntry::AssistantMessage(AssistantMessage {
 638                    chunks: vec![chunk],
 639                }),
 640                cx,
 641            );
 642        }
 643    }
 644
 645    pub fn request_tool_call(
 646        &mut self,
 647        label: String,
 648        icon: acp::Icon,
 649        content: Option<acp::ToolCallContent>,
 650        confirmation: acp::ToolCallConfirmation,
 651        cx: &mut Context<Self>,
 652    ) -> ToolCallRequest {
 653        let (tx, rx) = oneshot::channel();
 654
 655        let status = ToolCallStatus::WaitingForConfirmation {
 656            confirmation: ToolCallConfirmation::from_acp(
 657                confirmation,
 658                self.project.read(cx).languages().clone(),
 659                cx,
 660            ),
 661            respond_tx: tx,
 662        };
 663
 664        let id = self.insert_tool_call(label, status, icon, content, cx);
 665        ToolCallRequest { id, outcome: rx }
 666    }
 667
 668    pub fn push_tool_call(
 669        &mut self,
 670        label: String,
 671        icon: acp::Icon,
 672        content: Option<acp::ToolCallContent>,
 673        cx: &mut Context<Self>,
 674    ) -> acp::ToolCallId {
 675        let status = ToolCallStatus::Allowed {
 676            status: acp::ToolCallStatus::Running,
 677        };
 678
 679        self.insert_tool_call(label, status, icon, content, cx)
 680    }
 681
 682    fn insert_tool_call(
 683        &mut self,
 684        label: String,
 685        status: ToolCallStatus,
 686        icon: acp::Icon,
 687        content: Option<acp::ToolCallContent>,
 688        cx: &mut Context<Self>,
 689    ) -> acp::ToolCallId {
 690        let language_registry = self.project.read(cx).languages().clone();
 691        let id = acp::ToolCallId(self.entries.len() as u64);
 692
 693        self.push_entry(
 694            AgentThreadEntry::ToolCall(ToolCall {
 695                id,
 696                label: cx.new(|cx| {
 697                    Markdown::new(label.into(), Some(language_registry.clone()), None, cx)
 698                }),
 699                icon: acp_icon_to_ui_icon(icon),
 700                content: content
 701                    .map(|content| ToolCallContent::from_acp(content, language_registry, cx)),
 702                status,
 703            }),
 704            cx,
 705        );
 706
 707        id
 708    }
 709
 710    pub fn authorize_tool_call(
 711        &mut self,
 712        id: acp::ToolCallId,
 713        outcome: acp::ToolCallConfirmationOutcome,
 714        cx: &mut Context<Self>,
 715    ) {
 716        let Some((ix, call)) = self.tool_call_mut(id) else {
 717            return;
 718        };
 719
 720        let new_status = if outcome == acp::ToolCallConfirmationOutcome::Reject {
 721            ToolCallStatus::Rejected
 722        } else {
 723            ToolCallStatus::Allowed {
 724                status: acp::ToolCallStatus::Running,
 725            }
 726        };
 727
 728        let curr_status = mem::replace(&mut call.status, new_status);
 729
 730        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
 731            respond_tx.send(outcome).log_err();
 732        } else if cfg!(debug_assertions) {
 733            panic!("tried to authorize an already authorized tool call");
 734        }
 735
 736        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 737    }
 738
 739    pub fn update_tool_call(
 740        &mut self,
 741        id: acp::ToolCallId,
 742        new_status: acp::ToolCallStatus,
 743        new_content: Option<acp::ToolCallContent>,
 744        cx: &mut Context<Self>,
 745    ) -> Result<()> {
 746        let language_registry = self.project.read(cx).languages().clone();
 747        let (ix, call) = self.tool_call_mut(id).context("Entry not found")?;
 748
 749        call.content = new_content
 750            .map(|new_content| ToolCallContent::from_acp(new_content, language_registry, cx));
 751
 752        match &mut call.status {
 753            ToolCallStatus::Allowed { status } => {
 754                *status = new_status;
 755            }
 756            ToolCallStatus::WaitingForConfirmation { .. } => {
 757                anyhow::bail!("Tool call hasn't been authorized yet")
 758            }
 759            ToolCallStatus::Rejected => {
 760                anyhow::bail!("Tool call was rejected and therefore can't be updated")
 761            }
 762            ToolCallStatus::Canceled => {
 763                call.status = ToolCallStatus::Allowed { status: new_status };
 764            }
 765        }
 766
 767        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 768        Ok(())
 769    }
 770
 771    fn tool_call_mut(&mut self, id: acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
 772        let entry = self.entries.get_mut(id.0 as usize);
 773        debug_assert!(
 774            entry.is_some(),
 775            "We shouldn't give out ids to entries that don't exist"
 776        );
 777        match entry {
 778            Some(AgentThreadEntry::ToolCall(call)) if call.id == id => Some((id.0 as usize, call)),
 779            _ => {
 780                if cfg!(debug_assertions) {
 781                    panic!("entry is not a tool call");
 782                }
 783                None
 784            }
 785        }
 786    }
 787
 788    /// Returns true if the last turn is awaiting tool authorization
 789    pub fn waiting_for_tool_confirmation(&self) -> bool {
 790        for entry in self.entries.iter().rev() {
 791            match &entry {
 792                AgentThreadEntry::ToolCall(call) => match call.status {
 793                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
 794                    ToolCallStatus::Allowed { .. }
 795                    | ToolCallStatus::Rejected
 796                    | ToolCallStatus::Canceled => continue,
 797                },
 798                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
 799                    // Reached the beginning of the turn
 800                    return false;
 801                }
 802            }
 803        }
 804        false
 805    }
 806
 807    pub fn initialize(&self) -> impl use<> + Future<Output = Result<acp::InitializeResponse>> {
 808        let connection = self.connection.clone();
 809        async move { Ok(connection.request(acp::InitializeParams).await?) }
 810    }
 811
 812    pub fn authenticate(&self) -> impl use<> + Future<Output = Result<()>> {
 813        let connection = self.connection.clone();
 814        async move { Ok(connection.request(acp::AuthenticateParams).await?) }
 815    }
 816
 817    #[cfg(test)]
 818    pub fn send_raw(
 819        &mut self,
 820        message: &str,
 821        cx: &mut Context<Self>,
 822    ) -> BoxFuture<'static, Result<()>> {
 823        self.send(
 824            acp::SendUserMessageParams {
 825                chunks: vec![acp::UserMessageChunk::Text {
 826                    text: message.to_string(),
 827                }],
 828            },
 829            cx,
 830        )
 831    }
 832
 833    pub fn send(
 834        &mut self,
 835        message: acp::SendUserMessageParams,
 836        cx: &mut Context<Self>,
 837    ) -> BoxFuture<'static, Result<()>> {
 838        let agent = self.connection.clone();
 839        self.push_entry(
 840            AgentThreadEntry::UserMessage(UserMessage::from_acp(
 841                &message,
 842                self.project.read(cx).languages().clone(),
 843                cx,
 844            )),
 845            cx,
 846        );
 847
 848        let (tx, rx) = oneshot::channel();
 849        let cancel = self.cancel(cx);
 850
 851        self.send_task = Some(cx.spawn(async move |this, cx| {
 852            cancel.await.log_err();
 853
 854            let result = agent.request(message).await;
 855            tx.send(result).log_err();
 856            this.update(cx, |this, _cx| this.send_task.take()).log_err();
 857        }));
 858
 859        async move {
 860            match rx.await {
 861                Ok(Err(e)) => Err(e)?,
 862                _ => Ok(()),
 863            }
 864        }
 865        .boxed()
 866    }
 867
 868    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 869        let agent = self.connection.clone();
 870
 871        if self.send_task.take().is_some() {
 872            cx.spawn(async move |this, cx| {
 873                agent.request(acp::CancelSendMessageParams).await?;
 874
 875                this.update(cx, |this, _cx| {
 876                    for entry in this.entries.iter_mut() {
 877                        if let AgentThreadEntry::ToolCall(call) = entry {
 878                            let cancel = matches!(
 879                                call.status,
 880                                ToolCallStatus::WaitingForConfirmation { .. }
 881                                    | ToolCallStatus::Allowed {
 882                                        status: acp::ToolCallStatus::Running
 883                                    }
 884                            );
 885
 886                            if cancel {
 887                                let curr_status =
 888                                    mem::replace(&mut call.status, ToolCallStatus::Canceled);
 889
 890                                if let ToolCallStatus::WaitingForConfirmation {
 891                                    respond_tx, ..
 892                                } = curr_status
 893                                {
 894                                    respond_tx
 895                                        .send(acp::ToolCallConfirmationOutcome::Cancel)
 896                                        .ok();
 897                                }
 898                            }
 899                        }
 900                    }
 901                })
 902            })
 903        } else {
 904            Task::ready(Ok(()))
 905        }
 906    }
 907
 908    pub fn child_status(&mut self) -> Option<Task<Result<()>>> {
 909        self.child_status.take()
 910    }
 911
 912    pub fn to_markdown(&self, cx: &App) -> String {
 913        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
 914    }
 915}
 916
 917struct AcpClientDelegate {
 918    thread: WeakEntity<AcpThread>,
 919    cx: AsyncApp,
 920    // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
 921}
 922
 923impl AcpClientDelegate {
 924    fn new(thread: WeakEntity<AcpThread>, cx: AsyncApp) -> Self {
 925        Self { thread, cx }
 926    }
 927}
 928
 929impl acp::Client for AcpClientDelegate {
 930    async fn stream_assistant_message_chunk(
 931        &self,
 932        params: acp::StreamAssistantMessageChunkParams,
 933    ) -> Result<()> {
 934        let cx = &mut self.cx.clone();
 935
 936        cx.update(|cx| {
 937            self.thread
 938                .update(cx, |thread, cx| {
 939                    thread.push_assistant_chunk(params.chunk, cx)
 940                })
 941                .ok();
 942        })?;
 943
 944        Ok(())
 945    }
 946
 947    async fn request_tool_call_confirmation(
 948        &self,
 949        request: acp::RequestToolCallConfirmationParams,
 950    ) -> Result<acp::RequestToolCallConfirmationResponse> {
 951        let cx = &mut self.cx.clone();
 952        let ToolCallRequest { id, outcome } = cx
 953            .update(|cx| {
 954                self.thread.update(cx, |thread, cx| {
 955                    thread.request_tool_call(
 956                        request.label,
 957                        request.icon,
 958                        request.content,
 959                        request.confirmation,
 960                        cx,
 961                    )
 962                })
 963            })?
 964            .context("Failed to update thread")?;
 965
 966        Ok(acp::RequestToolCallConfirmationResponse {
 967            id,
 968            outcome: outcome.await?,
 969        })
 970    }
 971
 972    async fn push_tool_call(
 973        &self,
 974        request: acp::PushToolCallParams,
 975    ) -> Result<acp::PushToolCallResponse> {
 976        let cx = &mut self.cx.clone();
 977        let id = cx
 978            .update(|cx| {
 979                self.thread.update(cx, |thread, cx| {
 980                    thread.push_tool_call(request.label, request.icon, request.content, cx)
 981                })
 982            })?
 983            .context("Failed to update thread")?;
 984
 985        Ok(acp::PushToolCallResponse { id })
 986    }
 987
 988    async fn update_tool_call(&self, request: acp::UpdateToolCallParams) -> Result<()> {
 989        let cx = &mut self.cx.clone();
 990
 991        cx.update(|cx| {
 992            self.thread.update(cx, |thread, cx| {
 993                thread.update_tool_call(request.tool_call_id, request.status, request.content, cx)
 994            })
 995        })?
 996        .context("Failed to update thread")??;
 997
 998        Ok(())
 999    }
1000}
1001
1002fn acp_icon_to_ui_icon(icon: acp::Icon) -> IconName {
1003    match icon {
1004        acp::Icon::FileSearch => IconName::ToolSearch,
1005        acp::Icon::Folder => IconName::ToolFolder,
1006        acp::Icon::Globe => IconName::ToolWeb,
1007        acp::Icon::Hammer => IconName::ToolHammer,
1008        acp::Icon::LightBulb => IconName::ToolBulb,
1009        acp::Icon::Pencil => IconName::ToolPencil,
1010        acp::Icon::Regex => IconName::ToolRegex,
1011        acp::Icon::Terminal => IconName::ToolTerminal,
1012    }
1013}
1014
1015pub struct ToolCallRequest {
1016    pub id: acp::ToolCallId,
1017    pub outcome: oneshot::Receiver<acp::ToolCallConfirmationOutcome>,
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use super::*;
1023    use agent_servers::{AgentServerCommand, AgentServerVersion};
1024    use async_pipe::{PipeReader, PipeWriter};
1025    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1026    use gpui::{AsyncApp, TestAppContext};
1027    use indoc::indoc;
1028    use project::FakeFs;
1029    use serde_json::json;
1030    use settings::SettingsStore;
1031    use smol::{future::BoxedLocal, stream::StreamExt as _};
1032    use std::{cell::RefCell, env, path::Path, rc::Rc, time::Duration};
1033    use util::path;
1034
1035    fn init_test(cx: &mut TestAppContext) {
1036        env_logger::try_init().ok();
1037        cx.update(|cx| {
1038            let settings_store = SettingsStore::test(cx);
1039            cx.set_global(settings_store);
1040            Project::init_settings(cx);
1041            language::init(cx);
1042        });
1043    }
1044
1045    #[gpui::test]
1046    async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1047        init_test(cx);
1048
1049        let fs = FakeFs::new(cx.executor());
1050        let project = Project::test(fs, [], cx).await;
1051        let (thread, fake_server) = fake_acp_thread(project, cx);
1052
1053        fake_server.update(cx, |fake_server, _| {
1054            fake_server.on_user_message(move |_, server, mut cx| async move {
1055                server
1056                    .update(&mut cx, |server, _| {
1057                        server.send_to_zed(acp::StreamAssistantMessageChunkParams {
1058                            chunk: acp::AssistantMessageChunk::Thought {
1059                                thought: "Thinking ".into(),
1060                            },
1061                        })
1062                    })?
1063                    .await
1064                    .unwrap();
1065                server
1066                    .update(&mut cx, |server, _| {
1067                        server.send_to_zed(acp::StreamAssistantMessageChunkParams {
1068                            chunk: acp::AssistantMessageChunk::Thought {
1069                                thought: "hard!".into(),
1070                            },
1071                        })
1072                    })?
1073                    .await
1074                    .unwrap();
1075
1076                Ok(())
1077            })
1078        });
1079
1080        thread
1081            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1082            .await
1083            .unwrap();
1084
1085        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1086        assert_eq!(
1087            output,
1088            indoc! {r#"
1089            ## User
1090
1091            Hello from Zed!
1092
1093            ## Assistant
1094
1095            <thinking>
1096            Thinking hard!
1097            </thinking>
1098
1099            "#}
1100        );
1101    }
1102
1103    #[gpui::test]
1104    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1105        init_test(cx);
1106
1107        let fs = FakeFs::new(cx.executor());
1108        let project = Project::test(fs, [], cx).await;
1109        let (thread, fake_server) = fake_acp_thread(project, cx);
1110
1111        let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1112
1113        let tool_call_id = Rc::new(RefCell::new(None));
1114        let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1115        fake_server.update(cx, |fake_server, _| {
1116            let tool_call_id = tool_call_id.clone();
1117            fake_server.on_user_message(move |_, server, mut cx| {
1118                let end_turn_rx = end_turn_rx.clone();
1119                let tool_call_id = tool_call_id.clone();
1120                async move {
1121                    let tool_call_result = server
1122                        .update(&mut cx, |server, _| {
1123                            server.send_to_zed(acp::PushToolCallParams {
1124                                label: "Fetch".to_string(),
1125                                icon: acp::Icon::Globe,
1126                                content: None,
1127                            })
1128                        })?
1129                        .await
1130                        .unwrap();
1131                    *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1132                    end_turn_rx.take().unwrap().await.ok();
1133
1134                    Ok(())
1135                }
1136            })
1137        });
1138
1139        let request = thread.update(cx, |thread, cx| {
1140            thread.send_raw("Fetch https://example.com", cx)
1141        });
1142
1143        run_until_first_tool_call(&thread, cx).await;
1144
1145        thread.read_with(cx, |thread, _| {
1146            assert!(matches!(
1147                thread.entries[1],
1148                AgentThreadEntry::ToolCall(ToolCall {
1149                    status: ToolCallStatus::Allowed {
1150                        status: acp::ToolCallStatus::Running,
1151                        ..
1152                    },
1153                    ..
1154                })
1155            ));
1156        });
1157
1158        cx.run_until_parked();
1159
1160        thread
1161            .update(cx, |thread, cx| thread.cancel(cx))
1162            .await
1163            .unwrap();
1164
1165        thread.read_with(cx, |thread, _| {
1166            assert!(matches!(
1167                &thread.entries[1],
1168                AgentThreadEntry::ToolCall(ToolCall {
1169                    status: ToolCallStatus::Canceled,
1170                    ..
1171                })
1172            ));
1173        });
1174
1175        fake_server
1176            .update(cx, |fake_server, _| {
1177                fake_server.send_to_zed(acp::UpdateToolCallParams {
1178                    tool_call_id: tool_call_id.borrow().unwrap(),
1179                    status: acp::ToolCallStatus::Finished,
1180                    content: None,
1181                })
1182            })
1183            .await
1184            .unwrap();
1185
1186        drop(end_turn_tx);
1187        request.await.unwrap();
1188
1189        thread.read_with(cx, |thread, _| {
1190            assert!(matches!(
1191                thread.entries[1],
1192                AgentThreadEntry::ToolCall(ToolCall {
1193                    status: ToolCallStatus::Allowed {
1194                        status: acp::ToolCallStatus::Finished,
1195                        ..
1196                    },
1197                    ..
1198                })
1199            ));
1200        });
1201    }
1202
1203    #[gpui::test]
1204    #[cfg_attr(not(feature = "gemini"), ignore)]
1205    async fn test_gemini_basic(cx: &mut TestAppContext) {
1206        init_test(cx);
1207
1208        cx.executor().allow_parking();
1209
1210        let fs = FakeFs::new(cx.executor());
1211        let project = Project::test(fs, [], cx).await;
1212        let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1213        thread
1214            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1215            .await
1216            .unwrap();
1217
1218        thread.read_with(cx, |thread, _| {
1219            assert_eq!(thread.entries.len(), 2);
1220            assert!(matches!(
1221                thread.entries[0],
1222                AgentThreadEntry::UserMessage(_)
1223            ));
1224            assert!(matches!(
1225                thread.entries[1],
1226                AgentThreadEntry::AssistantMessage(_)
1227            ));
1228        });
1229    }
1230
1231    #[gpui::test]
1232    #[cfg_attr(not(feature = "gemini"), ignore)]
1233    async fn test_gemini_path_mentions(cx: &mut TestAppContext) {
1234        init_test(cx);
1235
1236        cx.executor().allow_parking();
1237        let tempdir = tempfile::tempdir().unwrap();
1238        std::fs::write(
1239            tempdir.path().join("foo.rs"),
1240            indoc! {"
1241                fn main() {
1242                    println!(\"Hello, world!\");
1243                }
1244            "},
1245        )
1246        .expect("failed to write file");
1247        let project = Project::example([tempdir.path()], &mut cx.to_async()).await;
1248        let thread = gemini_acp_thread(project.clone(), tempdir.path(), cx).await;
1249        thread
1250            .update(cx, |thread, cx| {
1251                thread.send(
1252                    acp::SendUserMessageParams {
1253                        chunks: vec![
1254                            acp::UserMessageChunk::Text {
1255                                text: "Read the file ".into(),
1256                            },
1257                            acp::UserMessageChunk::Path {
1258                                path: Path::new("foo.rs").into(),
1259                            },
1260                            acp::UserMessageChunk::Text {
1261                                text: " and tell me what the content of the println! is".into(),
1262                            },
1263                        ],
1264                    },
1265                    cx,
1266                )
1267            })
1268            .await
1269            .unwrap();
1270
1271        thread.read_with(cx, |thread, cx| {
1272            assert_eq!(thread.entries.len(), 3);
1273            assert!(matches!(
1274                thread.entries[0],
1275                AgentThreadEntry::UserMessage(_)
1276            ));
1277            assert!(matches!(thread.entries[1], AgentThreadEntry::ToolCall(_)));
1278            let AgentThreadEntry::AssistantMessage(assistant_message) = &thread.entries[2] else {
1279                panic!("Expected AssistantMessage")
1280            };
1281            assert!(
1282                assistant_message.to_markdown(cx).contains("Hello, world!"),
1283                "unexpected assistant message: {:?}",
1284                assistant_message.to_markdown(cx)
1285            );
1286        });
1287    }
1288
1289    #[gpui::test]
1290    #[cfg_attr(not(feature = "gemini"), ignore)]
1291    async fn test_gemini_tool_call(cx: &mut TestAppContext) {
1292        init_test(cx);
1293
1294        cx.executor().allow_parking();
1295
1296        let fs = FakeFs::new(cx.executor());
1297        fs.insert_tree(
1298            path!("/private/tmp"),
1299            json!({"foo": "Lorem ipsum dolor", "bar": "bar", "baz": "baz"}),
1300        )
1301        .await;
1302        let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1303        let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1304        thread
1305            .update(cx, |thread, cx| {
1306                thread.send_raw(
1307                    "Read the '/private/tmp/foo' file and tell me what you see.",
1308                    cx,
1309                )
1310            })
1311            .await
1312            .unwrap();
1313        thread.read_with(cx, |thread, _cx| {
1314            assert!(matches!(
1315                &thread.entries()[2],
1316                AgentThreadEntry::ToolCall(ToolCall {
1317                    status: ToolCallStatus::Allowed { .. },
1318                    ..
1319                })
1320            ));
1321
1322            assert!(matches!(
1323                thread.entries[3],
1324                AgentThreadEntry::AssistantMessage(_)
1325            ));
1326        });
1327    }
1328
1329    #[gpui::test]
1330    #[cfg_attr(not(feature = "gemini"), ignore)]
1331    async fn test_gemini_tool_call_with_confirmation(cx: &mut TestAppContext) {
1332        init_test(cx);
1333
1334        cx.executor().allow_parking();
1335
1336        let fs = FakeFs::new(cx.executor());
1337        let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1338        let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1339        let full_turn = thread.update(cx, |thread, cx| {
1340            thread.send_raw(r#"Run `echo "Hello, world!"`"#, cx)
1341        });
1342
1343        run_until_first_tool_call(&thread, cx).await;
1344
1345        let tool_call_id = thread.read_with(cx, |thread, _cx| {
1346            let AgentThreadEntry::ToolCall(ToolCall {
1347                id,
1348                status:
1349                    ToolCallStatus::WaitingForConfirmation {
1350                        confirmation: ToolCallConfirmation::Execute { root_command, .. },
1351                        ..
1352                    },
1353                ..
1354            }) = &thread.entries()[2]
1355            else {
1356                panic!();
1357            };
1358
1359            assert_eq!(root_command, "echo");
1360
1361            *id
1362        });
1363
1364        thread.update(cx, |thread, cx| {
1365            thread.authorize_tool_call(tool_call_id, acp::ToolCallConfirmationOutcome::Allow, cx);
1366
1367            assert!(matches!(
1368                &thread.entries()[2],
1369                AgentThreadEntry::ToolCall(ToolCall {
1370                    status: ToolCallStatus::Allowed { .. },
1371                    ..
1372                })
1373            ));
1374        });
1375
1376        full_turn.await.unwrap();
1377
1378        thread.read_with(cx, |thread, cx| {
1379            let AgentThreadEntry::ToolCall(ToolCall {
1380                content: Some(ToolCallContent::Markdown { markdown }),
1381                status: ToolCallStatus::Allowed { .. },
1382                ..
1383            }) = &thread.entries()[2]
1384            else {
1385                panic!();
1386            };
1387
1388            markdown.read_with(cx, |md, _cx| {
1389                assert!(
1390                    md.source().contains("Hello, world!"),
1391                    r#"Expected '{}' to contain "Hello, world!""#,
1392                    md.source()
1393                );
1394            });
1395        });
1396    }
1397
1398    #[gpui::test]
1399    #[cfg_attr(not(feature = "gemini"), ignore)]
1400    async fn test_gemini_cancel(cx: &mut TestAppContext) {
1401        init_test(cx);
1402
1403        cx.executor().allow_parking();
1404
1405        let fs = FakeFs::new(cx.executor());
1406        let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1407        let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1408        let full_turn = thread.update(cx, |thread, cx| {
1409            thread.send_raw(r#"Run `echo "Hello, world!"`"#, cx)
1410        });
1411
1412        let first_tool_call_ix = run_until_first_tool_call(&thread, cx).await;
1413
1414        thread.read_with(cx, |thread, _cx| {
1415            let AgentThreadEntry::ToolCall(ToolCall {
1416                id,
1417                status:
1418                    ToolCallStatus::WaitingForConfirmation {
1419                        confirmation: ToolCallConfirmation::Execute { root_command, .. },
1420                        ..
1421                    },
1422                ..
1423            }) = &thread.entries()[first_tool_call_ix]
1424            else {
1425                panic!("{:?}", thread.entries()[1]);
1426            };
1427
1428            assert_eq!(root_command, "echo");
1429
1430            *id
1431        });
1432
1433        thread
1434            .update(cx, |thread, cx| thread.cancel(cx))
1435            .await
1436            .unwrap();
1437        full_turn.await.unwrap();
1438        thread.read_with(cx, |thread, _| {
1439            let AgentThreadEntry::ToolCall(ToolCall {
1440                status: ToolCallStatus::Canceled,
1441                ..
1442            }) = &thread.entries()[first_tool_call_ix]
1443            else {
1444                panic!();
1445            };
1446        });
1447
1448        thread
1449            .update(cx, |thread, cx| {
1450                thread.send_raw(r#"Stop running and say goodbye to me."#, cx)
1451            })
1452            .await
1453            .unwrap();
1454        thread.read_with(cx, |thread, _| {
1455            assert!(matches!(
1456                &thread.entries().last().unwrap(),
1457                AgentThreadEntry::AssistantMessage(..),
1458            ))
1459        });
1460    }
1461
1462    async fn run_until_first_tool_call(
1463        thread: &Entity<AcpThread>,
1464        cx: &mut TestAppContext,
1465    ) -> usize {
1466        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1467
1468        let subscription = cx.update(|cx| {
1469            cx.subscribe(thread, move |thread, _, cx| {
1470                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1471                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1472                        return tx.try_send(ix).unwrap();
1473                    }
1474                }
1475            })
1476        });
1477
1478        select! {
1479            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1480                panic!("Timeout waiting for tool call")
1481            }
1482            ix = rx.next().fuse() => {
1483                drop(subscription);
1484                ix.unwrap()
1485            }
1486        }
1487    }
1488
1489    pub async fn gemini_acp_thread(
1490        project: Entity<Project>,
1491        current_dir: impl AsRef<Path>,
1492        cx: &mut TestAppContext,
1493    ) -> Entity<AcpThread> {
1494        struct DevGemini;
1495
1496        impl agent_servers::AgentServer for DevGemini {
1497            async fn command(
1498                &self,
1499                _project: &Entity<Project>,
1500                _cx: &mut AsyncApp,
1501            ) -> Result<agent_servers::AgentServerCommand> {
1502                let cli_path = Path::new(env!("CARGO_MANIFEST_DIR"))
1503                    .join("../../../gemini-cli/packages/cli")
1504                    .to_string_lossy()
1505                    .to_string();
1506
1507                Ok(AgentServerCommand {
1508                    path: "node".into(),
1509                    args: vec![cli_path, "--acp".into()],
1510                    env: None,
1511                })
1512            }
1513
1514            async fn version(
1515                &self,
1516                _command: &agent_servers::AgentServerCommand,
1517            ) -> Result<AgentServerVersion> {
1518                Ok(AgentServerVersion {
1519                    current_version: "0.1.0".into(),
1520                    supported: true,
1521                })
1522            }
1523        }
1524
1525        let thread = AcpThread::spawn(DevGemini, current_dir.as_ref(), project, &mut cx.to_async())
1526            .await
1527            .unwrap();
1528
1529        thread
1530            .update(cx, |thread, _| thread.initialize())
1531            .await
1532            .unwrap();
1533        thread
1534    }
1535
1536    pub fn fake_acp_thread(
1537        project: Entity<Project>,
1538        cx: &mut TestAppContext,
1539    ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1540        let (stdin_tx, stdin_rx) = async_pipe::pipe();
1541        let (stdout_tx, stdout_rx) = async_pipe::pipe();
1542        let thread = cx.update(|cx| cx.new(|cx| AcpThread::fake(stdin_tx, stdout_rx, project, cx)));
1543        let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1544        (thread, agent)
1545    }
1546
1547    pub struct FakeAcpServer {
1548        connection: acp::ClientConnection,
1549        _io_task: Task<()>,
1550        on_user_message: Option<
1551            Rc<
1552                dyn Fn(
1553                    acp::SendUserMessageParams,
1554                    Entity<FakeAcpServer>,
1555                    AsyncApp,
1556                ) -> LocalBoxFuture<'static, Result<()>>,
1557            >,
1558        >,
1559    }
1560
1561    #[derive(Clone)]
1562    struct FakeAgent {
1563        server: Entity<FakeAcpServer>,
1564        cx: AsyncApp,
1565    }
1566
1567    impl acp::Agent for FakeAgent {
1568        async fn initialize(&self) -> Result<acp::InitializeResponse> {
1569            Ok(acp::InitializeResponse {
1570                is_authenticated: true,
1571            })
1572        }
1573
1574        async fn authenticate(&self) -> Result<()> {
1575            Ok(())
1576        }
1577
1578        async fn cancel_send_message(&self) -> Result<()> {
1579            Ok(())
1580        }
1581
1582        async fn send_user_message(&self, request: acp::SendUserMessageParams) -> Result<()> {
1583            let mut cx = self.cx.clone();
1584            let handler = self
1585                .server
1586                .update(&mut cx, |server, _| server.on_user_message.clone())
1587                .ok()
1588                .flatten();
1589            if let Some(handler) = handler {
1590                handler(request, self.server.clone(), self.cx.clone()).await
1591            } else {
1592                anyhow::bail!("No handler for on_user_message")
1593            }
1594        }
1595    }
1596
1597    impl FakeAcpServer {
1598        fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1599            let agent = FakeAgent {
1600                server: cx.entity(),
1601                cx: cx.to_async(),
1602            };
1603            let foreground_executor = cx.foreground_executor().clone();
1604
1605            let (connection, io_fut) = acp::ClientConnection::connect_to_client(
1606                agent.clone(),
1607                stdout,
1608                stdin,
1609                move |fut| {
1610                    foreground_executor.spawn(fut).detach();
1611                },
1612            );
1613            FakeAcpServer {
1614                connection: connection,
1615                on_user_message: None,
1616                _io_task: cx.background_spawn(async move {
1617                    io_fut.await.log_err();
1618                }),
1619            }
1620        }
1621
1622        fn on_user_message<F>(
1623            &mut self,
1624            handler: impl for<'a> Fn(acp::SendUserMessageParams, Entity<FakeAcpServer>, AsyncApp) -> F
1625            + 'static,
1626        ) where
1627            F: Future<Output = Result<()>> + 'static,
1628        {
1629            self.on_user_message
1630                .replace(Rc::new(move |request, server, cx| {
1631                    handler(request, server, cx).boxed_local()
1632                }));
1633        }
1634
1635        fn send_to_zed<T: acp::ClientRequest + 'static>(
1636            &self,
1637            message: T,
1638        ) -> BoxedLocal<Result<T::Response>> {
1639            self.connection
1640                .request(message)
1641                .map(|f| f.map_err(|err| anyhow!(err)))
1642                .boxed_local()
1643        }
1644    }
1645}