acp.rs

   1pub use acp::ToolCallId;
   2use agent_servers::AgentServer;
   3use agentic_coding_protocol::{self as acp, UserMessageChunk};
   4use anyhow::{Context as _, Result, anyhow};
   5use buffer_diff::BufferDiff;
   6use editor::{MultiBuffer, PathKey};
   7use futures::{FutureExt, channel::oneshot, future::BoxFuture};
   8use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
   9use itertools::Itertools;
  10use language::{Anchor, Buffer, Capability, LanguageRegistry, OffsetRangeExt as _};
  11use markdown::Markdown;
  12use project::Project;
  13use std::error::Error;
  14use std::fmt::{Formatter, Write};
  15use std::{
  16    fmt::Display,
  17    mem,
  18    path::{Path, PathBuf},
  19    sync::Arc,
  20};
  21use ui::{App, IconName};
  22use util::ResultExt;
  23
  24#[derive(Clone, Debug, Eq, PartialEq)]
  25pub struct UserMessage {
  26    pub content: Entity<Markdown>,
  27}
  28
  29impl UserMessage {
  30    pub fn from_acp(
  31        message: acp::UserMessage,
  32        language_registry: Arc<LanguageRegistry>,
  33        cx: &mut App,
  34    ) -> Self {
  35        let mut md_source = String::new();
  36
  37        for chunk in message.chunks {
  38            match chunk {
  39                UserMessageChunk::Text { chunk } => md_source.push_str(&chunk),
  40                UserMessageChunk::Path { path } => {
  41                    write!(&mut md_source, "{}", MentionPath(&path)).unwrap()
  42                }
  43            }
  44        }
  45
  46        Self {
  47            content: cx
  48                .new(|cx| Markdown::new(md_source.into(), Some(language_registry), None, cx)),
  49        }
  50    }
  51
  52    fn to_markdown(&self, cx: &App) -> String {
  53        format!("## User\n\n{}\n\n", self.content.read(cx).source())
  54    }
  55}
  56
  57#[derive(Debug)]
  58pub struct MentionPath<'a>(&'a Path);
  59
  60impl<'a> MentionPath<'a> {
  61    const PREFIX: &'static str = "@file:";
  62
  63    pub fn new(path: &'a Path) -> Self {
  64        MentionPath(path)
  65    }
  66
  67    pub fn try_parse(url: &'a str) -> Option<Self> {
  68        let path = url.strip_prefix(Self::PREFIX)?;
  69        Some(MentionPath(Path::new(path)))
  70    }
  71
  72    pub fn path(&self) -> &Path {
  73        self.0
  74    }
  75}
  76
  77impl Display for MentionPath<'_> {
  78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  79        write!(
  80            f,
  81            "[@{}]({}{})",
  82            self.0.file_name().unwrap_or_default().display(),
  83            Self::PREFIX,
  84            self.0.display()
  85        )
  86    }
  87}
  88
  89#[derive(Clone, Debug, Eq, PartialEq)]
  90pub struct AssistantMessage {
  91    pub chunks: Vec<AssistantMessageChunk>,
  92}
  93
  94impl AssistantMessage {
  95    fn to_markdown(&self, cx: &App) -> String {
  96        format!(
  97            "## Assistant\n\n{}\n\n",
  98            self.chunks
  99                .iter()
 100                .map(|chunk| chunk.to_markdown(cx))
 101                .join("\n\n")
 102        )
 103    }
 104}
 105
 106#[derive(Clone, Debug, Eq, PartialEq)]
 107pub enum AssistantMessageChunk {
 108    Text { chunk: Entity<Markdown> },
 109    Thought { chunk: Entity<Markdown> },
 110}
 111
 112impl AssistantMessageChunk {
 113    pub fn from_acp(
 114        chunk: acp::AssistantMessageChunk,
 115        language_registry: Arc<LanguageRegistry>,
 116        cx: &mut App,
 117    ) -> Self {
 118        match chunk {
 119            acp::AssistantMessageChunk::Text { chunk } => Self::Text {
 120                chunk: cx.new(|cx| Markdown::new(chunk.into(), Some(language_registry), None, cx)),
 121            },
 122            acp::AssistantMessageChunk::Thought { chunk } => Self::Thought {
 123                chunk: cx.new(|cx| Markdown::new(chunk.into(), Some(language_registry), None, cx)),
 124            },
 125        }
 126    }
 127
 128    pub fn from_str(chunk: &str, language_registry: Arc<LanguageRegistry>, cx: &mut App) -> Self {
 129        Self::Text {
 130            chunk: cx.new(|cx| {
 131                Markdown::new(chunk.to_owned().into(), Some(language_registry), None, cx)
 132            }),
 133        }
 134    }
 135
 136    fn to_markdown(&self, cx: &App) -> String {
 137        match self {
 138            Self::Text { chunk } => chunk.read(cx).source().to_string(),
 139            Self::Thought { chunk } => {
 140                format!("<thinking>\n{}\n</thinking>", chunk.read(cx).source())
 141            }
 142        }
 143    }
 144}
 145
 146#[derive(Debug)]
 147pub enum AgentThreadEntry {
 148    UserMessage(UserMessage),
 149    AssistantMessage(AssistantMessage),
 150    ToolCall(ToolCall),
 151}
 152
 153impl AgentThreadEntry {
 154    fn to_markdown(&self, cx: &App) -> String {
 155        match self {
 156            Self::UserMessage(message) => message.to_markdown(cx),
 157            Self::AssistantMessage(message) => message.to_markdown(cx),
 158            Self::ToolCall(too_call) => too_call.to_markdown(cx),
 159        }
 160    }
 161}
 162
 163#[derive(Debug)]
 164pub struct ToolCall {
 165    pub id: acp::ToolCallId,
 166    pub label: Entity<Markdown>,
 167    pub icon: IconName,
 168    pub content: Option<ToolCallContent>,
 169    pub status: ToolCallStatus,
 170}
 171
 172impl ToolCall {
 173    fn to_markdown(&self, cx: &App) -> String {
 174        let mut markdown = format!(
 175            "**Tool Call: {}**\nStatus: {}\n\n",
 176            self.label.read(cx).source(),
 177            self.status
 178        );
 179        if let Some(content) = &self.content {
 180            markdown.push_str(content.to_markdown(cx).as_str());
 181            markdown.push_str("\n\n");
 182        }
 183        markdown
 184    }
 185}
 186
 187#[derive(Debug)]
 188pub enum ToolCallStatus {
 189    WaitingForConfirmation {
 190        confirmation: ToolCallConfirmation,
 191        respond_tx: oneshot::Sender<acp::ToolCallConfirmationOutcome>,
 192    },
 193    Allowed {
 194        status: acp::ToolCallStatus,
 195    },
 196    Rejected,
 197    Canceled,
 198}
 199
 200impl Display for ToolCallStatus {
 201    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 202        write!(
 203            f,
 204            "{}",
 205            match self {
 206                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 207                ToolCallStatus::Allowed { status } => match status {
 208                    acp::ToolCallStatus::Running => "Running",
 209                    acp::ToolCallStatus::Finished => "Finished",
 210                    acp::ToolCallStatus::Error => "Error",
 211                },
 212                ToolCallStatus::Rejected => "Rejected",
 213                ToolCallStatus::Canceled => "Canceled",
 214            }
 215        )
 216    }
 217}
 218
 219#[derive(Debug)]
 220pub enum ToolCallConfirmation {
 221    Edit {
 222        description: Option<Entity<Markdown>>,
 223    },
 224    Execute {
 225        command: String,
 226        root_command: String,
 227        description: Option<Entity<Markdown>>,
 228    },
 229    Mcp {
 230        server_name: String,
 231        tool_name: String,
 232        tool_display_name: String,
 233        description: Option<Entity<Markdown>>,
 234    },
 235    Fetch {
 236        urls: Vec<SharedString>,
 237        description: Option<Entity<Markdown>>,
 238    },
 239    Other {
 240        description: Entity<Markdown>,
 241    },
 242}
 243
 244impl ToolCallConfirmation {
 245    pub fn from_acp(
 246        confirmation: acp::ToolCallConfirmation,
 247        language_registry: Arc<LanguageRegistry>,
 248        cx: &mut App,
 249    ) -> Self {
 250        let to_md = |description: String, cx: &mut App| -> Entity<Markdown> {
 251            cx.new(|cx| {
 252                Markdown::new(
 253                    description.into(),
 254                    Some(language_registry.clone()),
 255                    None,
 256                    cx,
 257                )
 258            })
 259        };
 260
 261        match confirmation {
 262            acp::ToolCallConfirmation::Edit { description } => Self::Edit {
 263                description: description.map(|description| to_md(description, cx)),
 264            },
 265            acp::ToolCallConfirmation::Execute {
 266                command,
 267                root_command,
 268                description,
 269            } => Self::Execute {
 270                command,
 271                root_command,
 272                description: description.map(|description| to_md(description, cx)),
 273            },
 274            acp::ToolCallConfirmation::Mcp {
 275                server_name,
 276                tool_name,
 277                tool_display_name,
 278                description,
 279            } => Self::Mcp {
 280                server_name,
 281                tool_name,
 282                tool_display_name,
 283                description: description.map(|description| to_md(description, cx)),
 284            },
 285            acp::ToolCallConfirmation::Fetch { urls, description } => Self::Fetch {
 286                urls: urls.iter().map(|url| url.into()).collect(),
 287                description: description.map(|description| to_md(description, cx)),
 288            },
 289            acp::ToolCallConfirmation::Other { description } => Self::Other {
 290                description: to_md(description, cx),
 291            },
 292        }
 293    }
 294}
 295
 296#[derive(Debug)]
 297pub enum ToolCallContent {
 298    Markdown { markdown: Entity<Markdown> },
 299    Diff { diff: Diff },
 300}
 301
 302impl ToolCallContent {
 303    pub fn from_acp(
 304        content: acp::ToolCallContent,
 305        language_registry: Arc<LanguageRegistry>,
 306        cx: &mut App,
 307    ) -> Self {
 308        match content {
 309            acp::ToolCallContent::Markdown { markdown } => Self::Markdown {
 310                markdown: cx.new(|cx| Markdown::new_text(markdown.into(), cx)),
 311            },
 312            acp::ToolCallContent::Diff { diff } => Self::Diff {
 313                diff: Diff::from_acp(diff, language_registry, cx),
 314            },
 315        }
 316    }
 317
 318    fn to_markdown(&self, cx: &App) -> String {
 319        match self {
 320            Self::Markdown { markdown } => markdown.read(cx).source().to_string(),
 321            Self::Diff { diff } => diff.to_markdown(cx),
 322        }
 323    }
 324}
 325
 326#[derive(Debug)]
 327pub struct Diff {
 328    pub multibuffer: Entity<MultiBuffer>,
 329    pub path: PathBuf,
 330    _task: Task<Result<()>>,
 331}
 332
 333impl Diff {
 334    pub fn from_acp(
 335        diff: acp::Diff,
 336        language_registry: Arc<LanguageRegistry>,
 337        cx: &mut App,
 338    ) -> Self {
 339        let acp::Diff {
 340            path,
 341            old_text,
 342            new_text,
 343        } = diff;
 344
 345        let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
 346
 347        let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
 348        let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
 349        let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
 350        let old_buffer_snapshot = old_buffer.read(cx).snapshot();
 351        let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
 352        let diff_task = buffer_diff.update(cx, |diff, cx| {
 353            diff.set_base_text(
 354                old_buffer_snapshot,
 355                Some(language_registry.clone()),
 356                new_buffer_snapshot,
 357                cx,
 358            )
 359        });
 360
 361        let task = cx.spawn({
 362            let multibuffer = multibuffer.clone();
 363            let path = path.clone();
 364            async move |cx| {
 365                diff_task.await?;
 366
 367                multibuffer
 368                    .update(cx, |multibuffer, cx| {
 369                        let hunk_ranges = {
 370                            let buffer = new_buffer.read(cx);
 371                            let diff = buffer_diff.read(cx);
 372                            diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
 373                                .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
 374                                .collect::<Vec<_>>()
 375                        };
 376
 377                        multibuffer.set_excerpts_for_path(
 378                            PathKey::for_buffer(&new_buffer, cx),
 379                            new_buffer.clone(),
 380                            hunk_ranges,
 381                            editor::DEFAULT_MULTIBUFFER_CONTEXT,
 382                            cx,
 383                        );
 384                        multibuffer.add_diff(buffer_diff.clone(), cx);
 385                    })
 386                    .log_err();
 387
 388                if let Some(language) = language_registry
 389                    .language_for_file_path(&path)
 390                    .await
 391                    .log_err()
 392                {
 393                    new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
 394                }
 395
 396                anyhow::Ok(())
 397            }
 398        });
 399
 400        Self {
 401            multibuffer,
 402            path,
 403            _task: task,
 404        }
 405    }
 406
 407    fn to_markdown(&self, cx: &App) -> String {
 408        let buffer_text = self
 409            .multibuffer
 410            .read(cx)
 411            .all_buffers()
 412            .iter()
 413            .map(|buffer| buffer.read(cx).text())
 414            .join("\n");
 415        format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
 416    }
 417}
 418
 419pub struct AcpThread {
 420    entries: Vec<AgentThreadEntry>,
 421    title: SharedString,
 422    project: Entity<Project>,
 423    send_task: Option<Task<()>>,
 424    connection: Arc<acp::AgentConnection>,
 425    child_status: Option<Task<Result<()>>>,
 426    _io_task: Task<()>,
 427}
 428
 429pub enum AcpThreadEvent {
 430    NewEntry,
 431    EntryUpdated(usize),
 432}
 433
 434impl EventEmitter<AcpThreadEvent> for AcpThread {}
 435
 436#[derive(PartialEq, Eq)]
 437pub enum ThreadStatus {
 438    Idle,
 439    WaitingForToolConfirmation,
 440    Generating,
 441}
 442
 443#[derive(Debug, Clone)]
 444pub enum LoadError {
 445    Unsupported { current_version: SharedString },
 446    Exited(i32),
 447    Other(SharedString),
 448}
 449
 450impl Display for LoadError {
 451    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 452        match self {
 453            LoadError::Unsupported { current_version } => {
 454                write!(
 455                    f,
 456                    "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).",
 457                    current_version
 458                )
 459            }
 460            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 461            LoadError::Other(msg) => write!(f, "{}", msg),
 462        }
 463    }
 464}
 465
 466impl Error for LoadError {}
 467
 468impl AcpThread {
 469    pub async fn spawn(
 470        server: impl AgentServer + 'static,
 471        root_dir: &Path,
 472        project: Entity<Project>,
 473        cx: &mut AsyncApp,
 474    ) -> Result<Entity<Self>> {
 475        let command = match server.command(&project, cx).await {
 476            Ok(command) => command,
 477            Err(e) => return Err(anyhow!(LoadError::Other(format!("{e}").into()))),
 478        };
 479
 480        let mut child = util::command::new_smol_command(&command.path)
 481            .args(command.args.iter())
 482            .current_dir(root_dir)
 483            .stdin(std::process::Stdio::piped())
 484            .stdout(std::process::Stdio::piped())
 485            .stderr(std::process::Stdio::inherit())
 486            .kill_on_drop(true)
 487            .spawn()?;
 488
 489        let stdin = child.stdin.take().unwrap();
 490        let stdout = child.stdout.take().unwrap();
 491
 492        cx.new(|cx| {
 493            let foreground_executor = cx.foreground_executor().clone();
 494
 495            let (connection, io_fut) = acp::AgentConnection::connect_to_agent(
 496                AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
 497                stdin,
 498                stdout,
 499                move |fut| foreground_executor.spawn(fut).detach(),
 500            );
 501
 502            let io_task = cx.background_spawn(async move {
 503                io_fut.await.log_err();
 504            });
 505
 506            let child_status = cx.background_spawn(async move {
 507                match child.status().await {
 508                    Err(e) => Err(anyhow!(e)),
 509                    Ok(result) if result.success() => Ok(()),
 510                    Ok(result) => {
 511                        if let Some(version) = server.version(&command).await.log_err()
 512                            && !version.supported
 513                        {
 514                            Err(anyhow!(LoadError::Unsupported {
 515                                current_version: version.current_version
 516                            }))
 517                        } else {
 518                            Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127))))
 519                        }
 520                    }
 521                }
 522            });
 523
 524            Self {
 525                entries: Default::default(),
 526                title: "ACP Thread".into(),
 527                project,
 528                send_task: None,
 529                connection: Arc::new(connection),
 530                child_status: Some(child_status),
 531                _io_task: io_task,
 532            }
 533        })
 534    }
 535
 536    #[cfg(test)]
 537    pub fn fake(
 538        stdin: async_pipe::PipeWriter,
 539        stdout: async_pipe::PipeReader,
 540        project: Entity<Project>,
 541        cx: &mut Context<Self>,
 542    ) -> Self {
 543        let foreground_executor = cx.foreground_executor().clone();
 544
 545        let (connection, io_fut) = acp::AgentConnection::connect_to_agent(
 546            AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
 547            stdin,
 548            stdout,
 549            move |fut| {
 550                foreground_executor.spawn(fut).detach();
 551            },
 552        );
 553
 554        let io_task = cx.background_spawn({
 555            async move {
 556                io_fut.await.log_err();
 557            }
 558        });
 559
 560        Self {
 561            entries: Default::default(),
 562            title: "ACP Thread".into(),
 563            project,
 564            send_task: None,
 565            connection: Arc::new(connection),
 566            child_status: None,
 567            _io_task: io_task,
 568        }
 569    }
 570
 571    pub fn title(&self) -> SharedString {
 572        self.title.clone()
 573    }
 574
 575    pub fn entries(&self) -> &[AgentThreadEntry] {
 576        &self.entries
 577    }
 578
 579    pub fn status(&self) -> ThreadStatus {
 580        if self.send_task.is_some() {
 581            if self.waiting_for_tool_confirmation() {
 582                ThreadStatus::WaitingForToolConfirmation
 583            } else {
 584                ThreadStatus::Generating
 585            }
 586        } else {
 587            ThreadStatus::Idle
 588        }
 589    }
 590
 591    pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 592        self.entries.push(entry);
 593        cx.emit(AcpThreadEvent::NewEntry);
 594    }
 595
 596    pub fn push_assistant_chunk(
 597        &mut self,
 598        chunk: acp::AssistantMessageChunk,
 599        cx: &mut Context<Self>,
 600    ) {
 601        let entries_len = self.entries.len();
 602        if let Some(last_entry) = self.entries.last_mut()
 603            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 604        {
 605            cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
 606
 607            match (chunks.last_mut(), &chunk) {
 608                (
 609                    Some(AssistantMessageChunk::Text { chunk: old_chunk }),
 610                    acp::AssistantMessageChunk::Text { chunk: new_chunk },
 611                )
 612                | (
 613                    Some(AssistantMessageChunk::Thought { chunk: old_chunk }),
 614                    acp::AssistantMessageChunk::Thought { chunk: new_chunk },
 615                ) => {
 616                    old_chunk.update(cx, |old_chunk, cx| {
 617                        old_chunk.append(&new_chunk, cx);
 618                    });
 619                }
 620                _ => {
 621                    chunks.push(AssistantMessageChunk::from_acp(
 622                        chunk,
 623                        self.project.read(cx).languages().clone(),
 624                        cx,
 625                    ));
 626                }
 627            }
 628        } else {
 629            let chunk = AssistantMessageChunk::from_acp(
 630                chunk,
 631                self.project.read(cx).languages().clone(),
 632                cx,
 633            );
 634
 635            self.push_entry(
 636                AgentThreadEntry::AssistantMessage(AssistantMessage {
 637                    chunks: vec![chunk],
 638                }),
 639                cx,
 640            );
 641        }
 642    }
 643
 644    pub fn request_tool_call(
 645        &mut self,
 646        label: String,
 647        icon: acp::Icon,
 648        content: Option<acp::ToolCallContent>,
 649        confirmation: acp::ToolCallConfirmation,
 650        cx: &mut Context<Self>,
 651    ) -> ToolCallRequest {
 652        let (tx, rx) = oneshot::channel();
 653
 654        let status = ToolCallStatus::WaitingForConfirmation {
 655            confirmation: ToolCallConfirmation::from_acp(
 656                confirmation,
 657                self.project.read(cx).languages().clone(),
 658                cx,
 659            ),
 660            respond_tx: tx,
 661        };
 662
 663        let id = self.insert_tool_call(label, status, icon, content, cx);
 664        ToolCallRequest { id, outcome: rx }
 665    }
 666
 667    pub fn push_tool_call(
 668        &mut self,
 669        label: String,
 670        icon: acp::Icon,
 671        content: Option<acp::ToolCallContent>,
 672        cx: &mut Context<Self>,
 673    ) -> acp::ToolCallId {
 674        let status = ToolCallStatus::Allowed {
 675            status: acp::ToolCallStatus::Running,
 676        };
 677
 678        self.insert_tool_call(label, status, icon, content, cx)
 679    }
 680
 681    fn insert_tool_call(
 682        &mut self,
 683        label: String,
 684        status: ToolCallStatus,
 685        icon: acp::Icon,
 686        content: Option<acp::ToolCallContent>,
 687        cx: &mut Context<Self>,
 688    ) -> acp::ToolCallId {
 689        let language_registry = self.project.read(cx).languages().clone();
 690        let id = acp::ToolCallId(self.entries.len() as u64);
 691
 692        self.push_entry(
 693            AgentThreadEntry::ToolCall(ToolCall {
 694                id,
 695                label: cx.new(|cx| {
 696                    Markdown::new(label.into(), Some(language_registry.clone()), None, cx)
 697                }),
 698                icon: acp_icon_to_ui_icon(icon),
 699                content: content
 700                    .map(|content| ToolCallContent::from_acp(content, language_registry, cx)),
 701                status,
 702            }),
 703            cx,
 704        );
 705
 706        id
 707    }
 708
 709    pub fn authorize_tool_call(
 710        &mut self,
 711        id: acp::ToolCallId,
 712        outcome: acp::ToolCallConfirmationOutcome,
 713        cx: &mut Context<Self>,
 714    ) {
 715        let Some((ix, call)) = self.tool_call_mut(id) else {
 716            return;
 717        };
 718
 719        let new_status = if outcome == acp::ToolCallConfirmationOutcome::Reject {
 720            ToolCallStatus::Rejected
 721        } else {
 722            ToolCallStatus::Allowed {
 723                status: acp::ToolCallStatus::Running,
 724            }
 725        };
 726
 727        let curr_status = mem::replace(&mut call.status, new_status);
 728
 729        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
 730            respond_tx.send(outcome).log_err();
 731        } else if cfg!(debug_assertions) {
 732            panic!("tried to authorize an already authorized tool call");
 733        }
 734
 735        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 736    }
 737
 738    pub fn update_tool_call(
 739        &mut self,
 740        id: acp::ToolCallId,
 741        new_status: acp::ToolCallStatus,
 742        new_content: Option<acp::ToolCallContent>,
 743        cx: &mut Context<Self>,
 744    ) -> Result<()> {
 745        let language_registry = self.project.read(cx).languages().clone();
 746        let (ix, call) = self.tool_call_mut(id).context("Entry not found")?;
 747
 748        call.content = new_content
 749            .map(|new_content| ToolCallContent::from_acp(new_content, language_registry, cx));
 750
 751        match &mut call.status {
 752            ToolCallStatus::Allowed { status } => {
 753                *status = new_status;
 754            }
 755            ToolCallStatus::WaitingForConfirmation { .. } => {
 756                anyhow::bail!("Tool call hasn't been authorized yet")
 757            }
 758            ToolCallStatus::Rejected => {
 759                anyhow::bail!("Tool call was rejected and therefore can't be updated")
 760            }
 761            ToolCallStatus::Canceled => {
 762                call.status = ToolCallStatus::Allowed { status: new_status };
 763            }
 764        }
 765
 766        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 767        Ok(())
 768    }
 769
 770    fn tool_call_mut(&mut self, id: acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
 771        let entry = self.entries.get_mut(id.0 as usize);
 772        debug_assert!(
 773            entry.is_some(),
 774            "We shouldn't give out ids to entries that don't exist"
 775        );
 776        match entry {
 777            Some(AgentThreadEntry::ToolCall(call)) if call.id == id => Some((id.0 as usize, call)),
 778            _ => {
 779                if cfg!(debug_assertions) {
 780                    panic!("entry is not a tool call");
 781                }
 782                None
 783            }
 784        }
 785    }
 786
 787    /// Returns true if the last turn is awaiting tool authorization
 788    pub fn waiting_for_tool_confirmation(&self) -> bool {
 789        for entry in self.entries.iter().rev() {
 790            match &entry {
 791                AgentThreadEntry::ToolCall(call) => match call.status {
 792                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
 793                    ToolCallStatus::Allowed { .. }
 794                    | ToolCallStatus::Rejected
 795                    | ToolCallStatus::Canceled => continue,
 796                },
 797                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
 798                    // Reached the beginning of the turn
 799                    return false;
 800                }
 801            }
 802        }
 803        false
 804    }
 805
 806    pub fn initialize(&self) -> impl use<> + Future<Output = Result<acp::InitializeResponse>> {
 807        let connection = self.connection.clone();
 808        async move { Ok(connection.request(acp::InitializeParams).await?) }
 809    }
 810
 811    pub fn authenticate(&self) -> impl use<> + Future<Output = Result<()>> {
 812        let connection = self.connection.clone();
 813        async move { Ok(connection.request(acp::AuthenticateParams).await?) }
 814    }
 815
 816    pub fn send(
 817        &mut self,
 818        message: impl Into<acp::UserMessage>,
 819        cx: &mut Context<Self>,
 820    ) -> BoxFuture<'static, Result<()>> {
 821        let agent = self.connection.clone();
 822        let message = message.into();
 823        self.push_entry(
 824            AgentThreadEntry::UserMessage(UserMessage::from_acp(
 825                message.clone(),
 826                self.project.read(cx).languages().clone(),
 827                cx,
 828            )),
 829            cx,
 830        );
 831
 832        let (tx, rx) = oneshot::channel();
 833        let cancel = self.cancel(cx);
 834
 835        self.send_task = Some(cx.spawn(async move |this, cx| {
 836            cancel.await.log_err();
 837
 838            let result = agent.request(acp::SendUserMessageParams { message }).await;
 839            tx.send(result).log_err();
 840            this.update(cx, |this, _cx| this.send_task.take()).log_err();
 841        }));
 842
 843        async move {
 844            match rx.await {
 845                Ok(Err(e)) => Err(e)?,
 846                _ => Ok(()),
 847            }
 848        }
 849        .boxed()
 850    }
 851
 852    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 853        let agent = self.connection.clone();
 854
 855        if self.send_task.take().is_some() {
 856            cx.spawn(async move |this, cx| {
 857                agent.request(acp::CancelSendMessageParams).await?;
 858
 859                this.update(cx, |this, _cx| {
 860                    for entry in this.entries.iter_mut() {
 861                        if let AgentThreadEntry::ToolCall(call) = entry {
 862                            let cancel = matches!(
 863                                call.status,
 864                                ToolCallStatus::WaitingForConfirmation { .. }
 865                                    | ToolCallStatus::Allowed {
 866                                        status: acp::ToolCallStatus::Running
 867                                    }
 868                            );
 869
 870                            if cancel {
 871                                let curr_status =
 872                                    mem::replace(&mut call.status, ToolCallStatus::Canceled);
 873
 874                                if let ToolCallStatus::WaitingForConfirmation {
 875                                    respond_tx, ..
 876                                } = curr_status
 877                                {
 878                                    respond_tx
 879                                        .send(acp::ToolCallConfirmationOutcome::Cancel)
 880                                        .ok();
 881                                }
 882                            }
 883                        }
 884                    }
 885                })
 886            })
 887        } else {
 888            Task::ready(Ok(()))
 889        }
 890    }
 891
 892    pub fn child_status(&mut self) -> Option<Task<Result<()>>> {
 893        self.child_status.take()
 894    }
 895
 896    pub fn to_markdown(&self, cx: &App) -> String {
 897        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
 898    }
 899}
 900
 901struct AcpClientDelegate {
 902    thread: WeakEntity<AcpThread>,
 903    cx: AsyncApp,
 904    // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
 905}
 906
 907impl AcpClientDelegate {
 908    fn new(thread: WeakEntity<AcpThread>, cx: AsyncApp) -> Self {
 909        Self { thread, cx }
 910    }
 911}
 912
 913impl acp::Client for AcpClientDelegate {
 914    async fn stream_assistant_message_chunk(
 915        &self,
 916        params: acp::StreamAssistantMessageChunkParams,
 917    ) -> Result<()> {
 918        let cx = &mut self.cx.clone();
 919
 920        cx.update(|cx| {
 921            self.thread
 922                .update(cx, |thread, cx| {
 923                    thread.push_assistant_chunk(params.chunk, cx)
 924                })
 925                .ok();
 926        })?;
 927
 928        Ok(())
 929    }
 930
 931    async fn request_tool_call_confirmation(
 932        &self,
 933        request: acp::RequestToolCallConfirmationParams,
 934    ) -> Result<acp::RequestToolCallConfirmationResponse> {
 935        let cx = &mut self.cx.clone();
 936        let ToolCallRequest { id, outcome } = cx
 937            .update(|cx| {
 938                self.thread.update(cx, |thread, cx| {
 939                    thread.request_tool_call(
 940                        request.label,
 941                        request.icon,
 942                        request.content,
 943                        request.confirmation,
 944                        cx,
 945                    )
 946                })
 947            })?
 948            .context("Failed to update thread")?;
 949
 950        Ok(acp::RequestToolCallConfirmationResponse {
 951            id,
 952            outcome: outcome.await?,
 953        })
 954    }
 955
 956    async fn push_tool_call(
 957        &self,
 958        request: acp::PushToolCallParams,
 959    ) -> Result<acp::PushToolCallResponse> {
 960        let cx = &mut self.cx.clone();
 961        let id = cx
 962            .update(|cx| {
 963                self.thread.update(cx, |thread, cx| {
 964                    thread.push_tool_call(request.label, request.icon, request.content, cx)
 965                })
 966            })?
 967            .context("Failed to update thread")?;
 968
 969        Ok(acp::PushToolCallResponse { id })
 970    }
 971
 972    async fn update_tool_call(&self, request: acp::UpdateToolCallParams) -> Result<()> {
 973        let cx = &mut self.cx.clone();
 974
 975        cx.update(|cx| {
 976            self.thread.update(cx, |thread, cx| {
 977                thread.update_tool_call(request.tool_call_id, request.status, request.content, cx)
 978            })
 979        })?
 980        .context("Failed to update thread")??;
 981
 982        Ok(())
 983    }
 984}
 985
 986fn acp_icon_to_ui_icon(icon: acp::Icon) -> IconName {
 987    match icon {
 988        acp::Icon::FileSearch => IconName::ToolSearch,
 989        acp::Icon::Folder => IconName::ToolFolder,
 990        acp::Icon::Globe => IconName::ToolWeb,
 991        acp::Icon::Hammer => IconName::ToolHammer,
 992        acp::Icon::LightBulb => IconName::ToolBulb,
 993        acp::Icon::Pencil => IconName::ToolPencil,
 994        acp::Icon::Regex => IconName::ToolRegex,
 995        acp::Icon::Terminal => IconName::ToolTerminal,
 996    }
 997}
 998
 999pub struct ToolCallRequest {
1000    pub id: acp::ToolCallId,
1001    pub outcome: oneshot::Receiver<acp::ToolCallConfirmationOutcome>,
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007    use agent_servers::{AgentServerCommand, AgentServerVersion};
1008    use async_pipe::{PipeReader, PipeWriter};
1009    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1010    use gpui::{AsyncApp, TestAppContext};
1011    use indoc::indoc;
1012    use project::FakeFs;
1013    use serde_json::json;
1014    use settings::SettingsStore;
1015    use smol::{future::BoxedLocal, stream::StreamExt as _};
1016    use std::{cell::RefCell, env, path::Path, rc::Rc, time::Duration};
1017    use util::path;
1018
1019    fn init_test(cx: &mut TestAppContext) {
1020        env_logger::try_init().ok();
1021        cx.update(|cx| {
1022            let settings_store = SettingsStore::test(cx);
1023            cx.set_global(settings_store);
1024            Project::init_settings(cx);
1025            language::init(cx);
1026        });
1027    }
1028
1029    #[gpui::test]
1030    async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1031        init_test(cx);
1032
1033        cx.executor().allow_parking();
1034
1035        let fs = FakeFs::new(cx.executor());
1036        let project = Project::test(fs, [], cx).await;
1037        let (thread, fake_server) = fake_acp_thread(project, cx);
1038
1039        fake_server.update(cx, |fake_server, _| {
1040            fake_server.on_user_message(move |_, server, mut cx| async move {
1041                server
1042                    .update(&mut cx, |server, _| {
1043                        server.send_to_zed(acp::StreamAssistantMessageChunkParams {
1044                            chunk: acp::AssistantMessageChunk::Thought {
1045                                chunk: "Thinking ".into(),
1046                            },
1047                        })
1048                    })?
1049                    .await
1050                    .unwrap();
1051                server
1052                    .update(&mut cx, |server, _| {
1053                        server.send_to_zed(acp::StreamAssistantMessageChunkParams {
1054                            chunk: acp::AssistantMessageChunk::Thought {
1055                                chunk: "hard!".into(),
1056                            },
1057                        })
1058                    })?
1059                    .await
1060                    .unwrap();
1061
1062                Ok(())
1063            })
1064        });
1065
1066        thread
1067            .update(cx, |thread, cx| thread.send("Hello from Zed!", cx))
1068            .await
1069            .unwrap();
1070
1071        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1072        assert_eq!(
1073            output,
1074            indoc! {r#"
1075            ## User
1076
1077            Hello from Zed!
1078
1079            ## Assistant
1080
1081            <thinking>
1082            Thinking hard!
1083            </thinking>
1084
1085            "#}
1086        );
1087    }
1088
1089    #[gpui::test]
1090    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1091        init_test(cx);
1092
1093        let fs = FakeFs::new(cx.executor());
1094        let project = Project::test(fs, [], cx).await;
1095        let (thread, fake_server) = fake_acp_thread(project, cx);
1096
1097        let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1098
1099        let tool_call_id = Rc::new(RefCell::new(None));
1100        let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1101        fake_server.update(cx, |fake_server, _| {
1102            let tool_call_id = tool_call_id.clone();
1103            fake_server.on_user_message(move |_, server, mut cx| {
1104                let end_turn_rx = end_turn_rx.clone();
1105                let tool_call_id = tool_call_id.clone();
1106                async move {
1107                    let tool_call_result = server
1108                        .update(&mut cx, |server, _| {
1109                            server.send_to_zed(acp::PushToolCallParams {
1110                                label: "Fetch".to_string(),
1111                                icon: acp::Icon::Globe,
1112                                content: None,
1113                            })
1114                        })?
1115                        .await
1116                        .unwrap();
1117                    *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1118                    end_turn_rx.take().unwrap().await.ok();
1119
1120                    Ok(())
1121                }
1122            })
1123        });
1124
1125        let request = thread.update(cx, |thread, cx| {
1126            thread.send("Fetch https://example.com", cx)
1127        });
1128
1129        run_until_first_tool_call(&thread, cx).await;
1130
1131        thread.read_with(cx, |thread, _| {
1132            assert!(matches!(
1133                thread.entries[1],
1134                AgentThreadEntry::ToolCall(ToolCall {
1135                    status: ToolCallStatus::Allowed {
1136                        status: acp::ToolCallStatus::Running,
1137                        ..
1138                    },
1139                    ..
1140                })
1141            ));
1142        });
1143
1144        cx.run_until_parked();
1145
1146        thread
1147            .update(cx, |thread, cx| thread.cancel(cx))
1148            .await
1149            .unwrap();
1150
1151        thread.read_with(cx, |thread, _| {
1152            assert!(matches!(
1153                &thread.entries[1],
1154                AgentThreadEntry::ToolCall(ToolCall {
1155                    status: ToolCallStatus::Canceled,
1156                    ..
1157                })
1158            ));
1159        });
1160
1161        fake_server
1162            .update(cx, |fake_server, _| {
1163                fake_server.send_to_zed(acp::UpdateToolCallParams {
1164                    tool_call_id: tool_call_id.borrow().unwrap(),
1165                    status: acp::ToolCallStatus::Finished,
1166                    content: None,
1167                })
1168            })
1169            .await
1170            .unwrap();
1171
1172        drop(end_turn_tx);
1173        request.await.unwrap();
1174
1175        thread.read_with(cx, |thread, _| {
1176            assert!(matches!(
1177                thread.entries[1],
1178                AgentThreadEntry::ToolCall(ToolCall {
1179                    status: ToolCallStatus::Allowed {
1180                        status: acp::ToolCallStatus::Finished,
1181                        ..
1182                    },
1183                    ..
1184                })
1185            ));
1186        });
1187    }
1188
1189    #[gpui::test]
1190    #[cfg_attr(not(feature = "gemini"), ignore)]
1191    async fn test_gemini_basic(cx: &mut TestAppContext) {
1192        init_test(cx);
1193
1194        cx.executor().allow_parking();
1195
1196        let fs = FakeFs::new(cx.executor());
1197        let project = Project::test(fs, [], cx).await;
1198        let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1199        thread
1200            .update(cx, |thread, cx| thread.send("Hello from Zed!", cx))
1201            .await
1202            .unwrap();
1203
1204        thread.read_with(cx, |thread, _| {
1205            assert_eq!(thread.entries.len(), 2);
1206            assert!(matches!(
1207                thread.entries[0],
1208                AgentThreadEntry::UserMessage(_)
1209            ));
1210            assert!(matches!(
1211                thread.entries[1],
1212                AgentThreadEntry::AssistantMessage(_)
1213            ));
1214        });
1215    }
1216
1217    #[gpui::test]
1218    #[cfg_attr(not(feature = "gemini"), ignore)]
1219    async fn test_gemini_path_mentions(cx: &mut TestAppContext) {
1220        init_test(cx);
1221
1222        cx.executor().allow_parking();
1223        let tempdir = tempfile::tempdir().unwrap();
1224        std::fs::write(
1225            tempdir.path().join("foo.rs"),
1226            indoc! {"
1227                fn main() {
1228                    println!(\"Hello, world!\");
1229                }
1230            "},
1231        )
1232        .expect("failed to write file");
1233        let project = Project::example([tempdir.path()], &mut cx.to_async()).await;
1234        let thread = gemini_acp_thread(project.clone(), tempdir.path(), cx).await;
1235        thread
1236            .update(cx, |thread, cx| {
1237                thread.send(
1238                    acp::UserMessage {
1239                        chunks: vec![
1240                            "Read the file ".into(),
1241                            Path::new("foo.rs").into(),
1242                            " and tell me what the content of the println! is".into(),
1243                        ],
1244                    },
1245                    cx,
1246                )
1247            })
1248            .await
1249            .unwrap();
1250
1251        thread.read_with(cx, |thread, cx| {
1252            assert_eq!(thread.entries.len(), 3);
1253            assert!(matches!(
1254                thread.entries[0],
1255                AgentThreadEntry::UserMessage(_)
1256            ));
1257            assert!(matches!(thread.entries[1], AgentThreadEntry::ToolCall(_)));
1258            let AgentThreadEntry::AssistantMessage(assistant_message) = &thread.entries[2] else {
1259                panic!("Expected AssistantMessage")
1260            };
1261            assert!(
1262                assistant_message.to_markdown(cx).contains("Hello, world!"),
1263                "unexpected assistant message: {:?}",
1264                assistant_message.to_markdown(cx)
1265            );
1266        });
1267    }
1268
1269    #[gpui::test]
1270    #[cfg_attr(not(feature = "gemini"), ignore)]
1271    async fn test_gemini_tool_call(cx: &mut TestAppContext) {
1272        init_test(cx);
1273
1274        cx.executor().allow_parking();
1275
1276        let fs = FakeFs::new(cx.executor());
1277        fs.insert_tree(
1278            path!("/private/tmp"),
1279            json!({"foo": "Lorem ipsum dolor", "bar": "bar", "baz": "baz"}),
1280        )
1281        .await;
1282        let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1283        let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1284        thread
1285            .update(cx, |thread, cx| {
1286                thread.send(
1287                    "Read the '/private/tmp/foo' file and tell me what you see.",
1288                    cx,
1289                )
1290            })
1291            .await
1292            .unwrap();
1293        thread.read_with(cx, |thread, _cx| {
1294            assert!(matches!(
1295                &thread.entries()[2],
1296                AgentThreadEntry::ToolCall(ToolCall {
1297                    status: ToolCallStatus::Allowed { .. },
1298                    ..
1299                })
1300            ));
1301
1302            assert!(matches!(
1303                thread.entries[3],
1304                AgentThreadEntry::AssistantMessage(_)
1305            ));
1306        });
1307    }
1308
1309    #[gpui::test]
1310    #[cfg_attr(not(feature = "gemini"), ignore)]
1311    async fn test_gemini_tool_call_with_confirmation(cx: &mut TestAppContext) {
1312        init_test(cx);
1313
1314        cx.executor().allow_parking();
1315
1316        let fs = FakeFs::new(cx.executor());
1317        let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1318        let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1319        let full_turn = thread.update(cx, |thread, cx| {
1320            thread.send(r#"Run `echo "Hello, world!"`"#, cx)
1321        });
1322
1323        run_until_first_tool_call(&thread, cx).await;
1324
1325        let tool_call_id = thread.read_with(cx, |thread, _cx| {
1326            let AgentThreadEntry::ToolCall(ToolCall {
1327                id,
1328                status:
1329                    ToolCallStatus::WaitingForConfirmation {
1330                        confirmation: ToolCallConfirmation::Execute { root_command, .. },
1331                        ..
1332                    },
1333                ..
1334            }) = &thread.entries()[2]
1335            else {
1336                panic!();
1337            };
1338
1339            assert_eq!(root_command, "echo");
1340
1341            *id
1342        });
1343
1344        thread.update(cx, |thread, cx| {
1345            thread.authorize_tool_call(tool_call_id, acp::ToolCallConfirmationOutcome::Allow, cx);
1346
1347            assert!(matches!(
1348                &thread.entries()[2],
1349                AgentThreadEntry::ToolCall(ToolCall {
1350                    status: ToolCallStatus::Allowed { .. },
1351                    ..
1352                })
1353            ));
1354        });
1355
1356        full_turn.await.unwrap();
1357
1358        thread.read_with(cx, |thread, cx| {
1359            let AgentThreadEntry::ToolCall(ToolCall {
1360                content: Some(ToolCallContent::Markdown { markdown }),
1361                status: ToolCallStatus::Allowed { .. },
1362                ..
1363            }) = &thread.entries()[2]
1364            else {
1365                panic!();
1366            };
1367
1368            markdown.read_with(cx, |md, _cx| {
1369                assert!(
1370                    md.source().contains("Hello, world!"),
1371                    r#"Expected '{}' to contain "Hello, world!""#,
1372                    md.source()
1373                );
1374            });
1375        });
1376    }
1377
1378    #[gpui::test]
1379    #[cfg_attr(not(feature = "gemini"), ignore)]
1380    async fn test_gemini_cancel(cx: &mut TestAppContext) {
1381        init_test(cx);
1382
1383        cx.executor().allow_parking();
1384
1385        let fs = FakeFs::new(cx.executor());
1386        let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1387        let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1388        let full_turn = thread.update(cx, |thread, cx| {
1389            thread.send(r#"Run `echo "Hello, world!"`"#, cx)
1390        });
1391
1392        let first_tool_call_ix = run_until_first_tool_call(&thread, cx).await;
1393
1394        thread.read_with(cx, |thread, _cx| {
1395            let AgentThreadEntry::ToolCall(ToolCall {
1396                id,
1397                status:
1398                    ToolCallStatus::WaitingForConfirmation {
1399                        confirmation: ToolCallConfirmation::Execute { root_command, .. },
1400                        ..
1401                    },
1402                ..
1403            }) = &thread.entries()[first_tool_call_ix]
1404            else {
1405                panic!("{:?}", thread.entries()[1]);
1406            };
1407
1408            assert_eq!(root_command, "echo");
1409
1410            *id
1411        });
1412
1413        thread
1414            .update(cx, |thread, cx| thread.cancel(cx))
1415            .await
1416            .unwrap();
1417        full_turn.await.unwrap();
1418        thread.read_with(cx, |thread, _| {
1419            let AgentThreadEntry::ToolCall(ToolCall {
1420                status: ToolCallStatus::Canceled,
1421                ..
1422            }) = &thread.entries()[first_tool_call_ix]
1423            else {
1424                panic!();
1425            };
1426        });
1427
1428        thread
1429            .update(cx, |thread, cx| {
1430                thread.send(r#"Stop running and say goodbye to me."#, cx)
1431            })
1432            .await
1433            .unwrap();
1434        thread.read_with(cx, |thread, _| {
1435            assert!(matches!(
1436                &thread.entries().last().unwrap(),
1437                AgentThreadEntry::AssistantMessage(..),
1438            ))
1439        });
1440    }
1441
1442    async fn run_until_first_tool_call(
1443        thread: &Entity<AcpThread>,
1444        cx: &mut TestAppContext,
1445    ) -> usize {
1446        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1447
1448        let subscription = cx.update(|cx| {
1449            cx.subscribe(thread, move |thread, _, cx| {
1450                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1451                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1452                        return tx.try_send(ix).unwrap();
1453                    }
1454                }
1455            })
1456        });
1457
1458        select! {
1459            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1460                panic!("Timeout waiting for tool call")
1461            }
1462            ix = rx.next().fuse() => {
1463                drop(subscription);
1464                ix.unwrap()
1465            }
1466        }
1467    }
1468
1469    pub async fn gemini_acp_thread(
1470        project: Entity<Project>,
1471        current_dir: impl AsRef<Path>,
1472        cx: &mut TestAppContext,
1473    ) -> Entity<AcpThread> {
1474        struct DevGemini;
1475
1476        impl agent_servers::AgentServer for DevGemini {
1477            async fn command(
1478                &self,
1479                _project: &Entity<Project>,
1480                _cx: &mut AsyncApp,
1481            ) -> Result<agent_servers::AgentServerCommand> {
1482                let cli_path = Path::new(env!("CARGO_MANIFEST_DIR"))
1483                    .join("../../../gemini-cli/packages/cli")
1484                    .to_string_lossy()
1485                    .to_string();
1486
1487                Ok(AgentServerCommand {
1488                    path: "node".into(),
1489                    args: vec![cli_path, "--acp".into()],
1490                    env: None,
1491                })
1492            }
1493
1494            async fn version(
1495                &self,
1496                _command: &agent_servers::AgentServerCommand,
1497            ) -> Result<AgentServerVersion> {
1498                Ok(AgentServerVersion {
1499                    current_version: "0.1.0".into(),
1500                    supported: true,
1501                })
1502            }
1503        }
1504
1505        let thread = AcpThread::spawn(DevGemini, current_dir.as_ref(), project, &mut cx.to_async())
1506            .await
1507            .unwrap();
1508
1509        thread
1510            .update(cx, |thread, _| thread.initialize())
1511            .await
1512            .unwrap();
1513        thread
1514    }
1515
1516    pub fn fake_acp_thread(
1517        project: Entity<Project>,
1518        cx: &mut TestAppContext,
1519    ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1520        let (stdin_tx, stdin_rx) = async_pipe::pipe();
1521        let (stdout_tx, stdout_rx) = async_pipe::pipe();
1522        let thread = cx.update(|cx| cx.new(|cx| AcpThread::fake(stdin_tx, stdout_rx, project, cx)));
1523        let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1524        (thread, agent)
1525    }
1526
1527    pub struct FakeAcpServer {
1528        connection: acp::ClientConnection,
1529        _io_task: Task<()>,
1530        on_user_message: Option<
1531            Rc<
1532                dyn Fn(
1533                    acp::SendUserMessageParams,
1534                    Entity<FakeAcpServer>,
1535                    AsyncApp,
1536                ) -> LocalBoxFuture<'static, Result<()>>,
1537            >,
1538        >,
1539    }
1540
1541    #[derive(Clone)]
1542    struct FakeAgent {
1543        server: Entity<FakeAcpServer>,
1544        cx: AsyncApp,
1545    }
1546
1547    impl acp::Agent for FakeAgent {
1548        async fn initialize(&self) -> Result<acp::InitializeResponse> {
1549            Ok(acp::InitializeResponse {
1550                is_authenticated: true,
1551            })
1552        }
1553
1554        async fn authenticate(&self) -> Result<()> {
1555            Ok(())
1556        }
1557
1558        async fn cancel_send_message(&self) -> Result<()> {
1559            Ok(())
1560        }
1561
1562        async fn send_user_message(&self, request: acp::SendUserMessageParams) -> Result<()> {
1563            let mut cx = self.cx.clone();
1564            let handler = self
1565                .server
1566                .update(&mut cx, |server, _| server.on_user_message.clone())
1567                .ok()
1568                .flatten();
1569            if let Some(handler) = handler {
1570                handler(request, self.server.clone(), self.cx.clone()).await
1571            } else {
1572                anyhow::bail!("No handler for on_user_message")
1573            }
1574        }
1575    }
1576
1577    impl FakeAcpServer {
1578        fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1579            let agent = FakeAgent {
1580                server: cx.entity(),
1581                cx: cx.to_async(),
1582            };
1583            let foreground_executor = cx.foreground_executor().clone();
1584
1585            let (connection, io_fut) = acp::ClientConnection::connect_to_client(
1586                agent.clone(),
1587                stdout,
1588                stdin,
1589                move |fut| {
1590                    foreground_executor.spawn(fut).detach();
1591                },
1592            );
1593            FakeAcpServer {
1594                connection: connection,
1595                on_user_message: None,
1596                _io_task: cx.background_spawn(async move {
1597                    io_fut.await.log_err();
1598                }),
1599            }
1600        }
1601
1602        fn on_user_message<F>(
1603            &mut self,
1604            handler: impl for<'a> Fn(acp::SendUserMessageParams, Entity<FakeAcpServer>, AsyncApp) -> F
1605            + 'static,
1606        ) where
1607            F: Future<Output = Result<()>> + 'static,
1608        {
1609            self.on_user_message
1610                .replace(Rc::new(move |request, server, cx| {
1611                    handler(request, server, cx).boxed_local()
1612                }));
1613        }
1614
1615        fn send_to_zed<T: acp::ClientRequest + 'static>(
1616            &self,
1617            message: T,
1618        ) -> BoxedLocal<Result<T::Response>> {
1619            self.connection
1620                .request(message)
1621                .map(|f| f.map_err(|err| anyhow!(err)))
1622                .boxed_local()
1623        }
1624    }
1625}