acp_thread.rs

   1mod connection;
   2pub use connection::*;
   3
   4pub use acp_old::ToolCallId;
   5use agent_client_protocol::{self as acp};
   6use agentic_coding_protocol as acp_old;
   7use anyhow::{Context as _, Result};
   8use assistant_tool::ActionLog;
   9use buffer_diff::BufferDiff;
  10use editor::{Bias, MultiBuffer, PathKey};
  11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  13use itertools::Itertools;
  14use language::{
  15    Anchor, Buffer, BufferSnapshot, Capability, LanguageRegistry, OffsetRangeExt as _, Point,
  16    text_diff,
  17};
  18use markdown::Markdown;
  19use project::{AgentLocation, Project};
  20use std::collections::HashMap;
  21use std::error::Error;
  22use std::fmt::{Formatter, Write};
  23use std::{
  24    fmt::Display,
  25    mem,
  26    path::{Path, PathBuf},
  27    sync::Arc,
  28};
  29use ui::{App, IconName};
  30use util::ResultExt;
  31
  32#[derive(Debug)]
  33pub struct UserMessage {
  34    pub content: ContentBlock,
  35}
  36
  37impl UserMessage {
  38    pub fn from_acp(
  39        message: &[acp::ContentBlock],
  40        language_registry: Arc<LanguageRegistry>,
  41        cx: &mut App,
  42    ) -> Self {
  43        let mut content = ContentBlock::Empty;
  44        for chunk in message {
  45            content.append(chunk, &language_registry, cx)
  46        }
  47        Self { content: content }
  48    }
  49
  50    fn to_markdown(&self, cx: &App) -> String {
  51        format!("## User\n{}\n", self.content.to_markdown(cx))
  52    }
  53}
  54
  55#[derive(Debug)]
  56pub struct MentionPath<'a>(&'a Path);
  57
  58impl<'a> MentionPath<'a> {
  59    const PREFIX: &'static str = "@file:";
  60
  61    pub fn new(path: &'a Path) -> Self {
  62        MentionPath(path)
  63    }
  64
  65    pub fn try_parse(url: &'a str) -> Option<Self> {
  66        let path = url.strip_prefix(Self::PREFIX)?;
  67        Some(MentionPath(Path::new(path)))
  68    }
  69
  70    pub fn path(&self) -> &Path {
  71        self.0
  72    }
  73}
  74
  75impl Display for MentionPath<'_> {
  76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  77        write!(
  78            f,
  79            "[@{}]({}{})",
  80            self.0.file_name().unwrap_or_default().display(),
  81            Self::PREFIX,
  82            self.0.display()
  83        )
  84    }
  85}
  86
  87#[derive(Debug, PartialEq)]
  88pub struct AssistantMessage {
  89    pub chunks: Vec<AssistantMessageChunk>,
  90}
  91
  92impl AssistantMessage {
  93    pub fn to_markdown(&self, cx: &App) -> String {
  94        format!(
  95            "## Assistant\n\n{}\n\n",
  96            self.chunks
  97                .iter()
  98                .map(|chunk| chunk.to_markdown(cx))
  99                .join("\n\n")
 100        )
 101    }
 102}
 103
 104#[derive(Debug, PartialEq)]
 105pub enum AssistantMessageChunk {
 106    Message { block: ContentBlock },
 107    Thought { block: ContentBlock },
 108}
 109
 110impl AssistantMessageChunk {
 111    pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
 112        Self::Message {
 113            block: ContentBlock::new(
 114                &acp::ContentBlock::Text(acp::TextContent {
 115                    text: chunk.to_owned().into(),
 116                    annotations: None,
 117                }),
 118                language_registry,
 119                cx,
 120            ),
 121        }
 122    }
 123
 124    fn to_markdown(&self, cx: &App) -> String {
 125        match self {
 126            Self::Message { block } => block.to_markdown(cx).to_string(),
 127            Self::Thought { block } => {
 128                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 129            }
 130        }
 131    }
 132}
 133
 134#[derive(Debug)]
 135pub enum AgentThreadEntry {
 136    UserMessage(UserMessage),
 137    AssistantMessage(AssistantMessage),
 138    ToolCall(ToolCall),
 139}
 140
 141impl AgentThreadEntry {
 142    fn to_markdown(&self, cx: &App) -> String {
 143        match self {
 144            Self::UserMessage(message) => message.to_markdown(cx),
 145            Self::AssistantMessage(message) => message.to_markdown(cx),
 146            Self::ToolCall(too_call) => too_call.to_markdown(cx),
 147        }
 148    }
 149
 150    pub fn diff(&self) -> Option<&Diff> {
 151        if let AgentThreadEntry::ToolCall(ToolCall {
 152            content: Some(ToolCallContent::Diff { diff }),
 153            ..
 154        }) = self
 155        {
 156            Some(&diff)
 157        } else {
 158            None
 159        }
 160    }
 161
 162    pub fn locations(&self) -> Option<&[acp::ToolCallLocation]> {
 163        if let AgentThreadEntry::ToolCall(ToolCall { locations, .. }) = self {
 164            Some(locations)
 165        } else {
 166            None
 167        }
 168    }
 169}
 170
 171#[derive(Debug)]
 172pub struct ToolCall {
 173    pub id: acp::ToolCallId,
 174    pub label: Entity<Markdown>,
 175    pub kind: acp::ToolKind,
 176    pub content: Option<ToolCallContent>,
 177    pub status: ToolCallStatus,
 178    pub locations: Vec<acp::ToolCallLocation>,
 179}
 180
 181impl ToolCall {
 182    fn to_markdown(&self, cx: &App) -> String {
 183        let mut markdown = format!(
 184            "**Tool Call: {}**\nStatus: {}\n\n",
 185            self.label.read(cx).source(),
 186            self.status
 187        );
 188        if let Some(content) = &self.content {
 189            markdown.push_str(content.to_markdown(cx).as_str());
 190            markdown.push_str("\n\n");
 191        }
 192        markdown
 193    }
 194}
 195
 196#[derive(Debug)]
 197pub enum ToolCallStatus {
 198    WaitingForConfirmation {
 199        possible_grants: Vec<acp::Grant>,
 200        respond_tx: oneshot::Sender<acp::GrantId>,
 201    },
 202    Allowed {
 203        status: acp::ToolCallStatus,
 204    },
 205    Rejected,
 206    Canceled,
 207}
 208
 209impl Display for ToolCallStatus {
 210    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 211        write!(
 212            f,
 213            "{}",
 214            match self {
 215                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 216                ToolCallStatus::Allowed { status } => match status {
 217                    acp::ToolCallStatus::InProgress => "In Progress",
 218                    acp::ToolCallStatus::Completed => "Completed",
 219                    acp::ToolCallStatus::Failed => "Failed",
 220                },
 221                ToolCallStatus::Rejected => "Rejected",
 222                ToolCallStatus::Canceled => "Canceled",
 223            }
 224        )
 225    }
 226}
 227
 228#[derive(Debug)]
 229pub enum ToolCallConfirmation {
 230    Edit {
 231        description: Option<Entity<Markdown>>,
 232    },
 233    Execute {
 234        command: String,
 235        root_command: String,
 236        description: Option<Entity<Markdown>>,
 237    },
 238    Mcp {
 239        server_name: String,
 240        tool_name: String,
 241        tool_display_name: String,
 242        description: Option<Entity<Markdown>>,
 243    },
 244    Fetch {
 245        urls: Vec<SharedString>,
 246        description: Option<Entity<Markdown>>,
 247    },
 248    Other {
 249        description: Entity<Markdown>,
 250    },
 251}
 252
 253impl ToolCallConfirmation {
 254    pub fn from_acp(
 255        confirmation: acp_old::ToolCallConfirmation,
 256        language_registry: Arc<LanguageRegistry>,
 257        cx: &mut App,
 258    ) -> Self {
 259        let to_md = |description: String, cx: &mut App| -> Entity<Markdown> {
 260            cx.new(|cx| {
 261                Markdown::new(
 262                    description.into(),
 263                    Some(language_registry.clone()),
 264                    None,
 265                    cx,
 266                )
 267            })
 268        };
 269
 270        match confirmation {
 271            acp_old::ToolCallConfirmation::Edit { description } => Self::Edit {
 272                description: description.map(|description| to_md(description, cx)),
 273            },
 274            acp_old::ToolCallConfirmation::Execute {
 275                command,
 276                root_command,
 277                description,
 278            } => Self::Execute {
 279                command,
 280                root_command,
 281                description: description.map(|description| to_md(description, cx)),
 282            },
 283            acp_old::ToolCallConfirmation::Mcp {
 284                server_name,
 285                tool_name,
 286                tool_display_name,
 287                description,
 288            } => Self::Mcp {
 289                server_name,
 290                tool_name,
 291                tool_display_name,
 292                description: description.map(|description| to_md(description, cx)),
 293            },
 294            acp_old::ToolCallConfirmation::Fetch { urls, description } => Self::Fetch {
 295                urls: urls.iter().map(|url| url.into()).collect(),
 296                description: description.map(|description| to_md(description, cx)),
 297            },
 298            acp_old::ToolCallConfirmation::Other { description } => Self::Other {
 299                description: to_md(description, cx),
 300            },
 301        }
 302    }
 303}
 304
 305#[derive(Debug, PartialEq, Clone)]
 306enum ContentBlock {
 307    Empty,
 308    Markdown { markdown: Entity<Markdown> },
 309}
 310
 311impl ContentBlock {
 312    pub fn new(
 313        block: &acp::ContentBlock,
 314        language_registry: &Arc<LanguageRegistry>,
 315        cx: &mut App,
 316    ) -> Self {
 317        let mut this = Self::Empty;
 318        this.append(block, language_registry, cx);
 319        this
 320    }
 321
 322    pub fn new_combined(
 323        blocks: &[acp::ContentBlock],
 324        language_registry: &Arc<LanguageRegistry>,
 325        cx: &mut App,
 326    ) -> Self {
 327        let mut this = Self::Empty;
 328        for block in blocks {
 329            this.append(block, language_registry, cx);
 330        }
 331        this
 332    }
 333
 334    pub fn append(
 335        &mut self,
 336        block: &acp::ContentBlock,
 337        language_registry: &Arc<LanguageRegistry>,
 338        cx: &mut App,
 339    ) {
 340        let new_content = match block {
 341            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 342            acp::ContentBlock::ResourceLink(resource_link) => {
 343                if let Some(path) = resource_link.uri.strip_prefix("file://") {
 344                    format!("{}", MentionPath(path.as_ref()))
 345                } else {
 346                    resource_link.uri.clone()
 347                }
 348            }
 349            acp::ContentBlock::Image(_)
 350            | acp::ContentBlock::Audio(_)
 351            | acp::ContentBlock::Resource(_) => String::new(),
 352        };
 353
 354        match self {
 355            ContentBlock::Empty => {
 356                *self = ContentBlock::Markdown {
 357                    markdown: cx.new(|cx| {
 358                        Markdown::new(
 359                            new_content.into(),
 360                            Some(language_registry.clone()),
 361                            None,
 362                            cx,
 363                        )
 364                    }),
 365                };
 366            }
 367            ContentBlock::Markdown { markdown } => {
 368                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 369            }
 370        }
 371    }
 372
 373    fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 374        match self {
 375            ContentBlock::Empty => "",
 376            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 377        }
 378    }
 379}
 380
 381#[derive(Debug)]
 382pub enum ToolCallContent {
 383    ContentBlock { content: ContentBlock },
 384    Diff { diff: Diff },
 385}
 386
 387impl ToolCallContent {
 388    pub fn from_acp(
 389        content: acp::ToolCallContent,
 390        language_registry: Arc<LanguageRegistry>,
 391        cx: &mut App,
 392    ) -> Self {
 393        match content {
 394            acp_old::ToolCallContent::Markdown { markdown } => Self::Markdown {
 395                markdown: cx.new(|cx| Markdown::new_text(markdown.into(), cx)),
 396            },
 397            acp_old::ToolCallContent::Diff { diff } => Self::Diff {
 398                diff: Diff::from_acp(diff, language_registry, cx),
 399            },
 400        }
 401    }
 402
 403    fn to_markdown(&self, cx: &App) -> String {
 404        match self {
 405            Self::Markdown { markdown } => markdown.read(cx).source().to_string(),
 406            Self::Diff { diff } => diff.to_markdown(cx),
 407        }
 408    }
 409}
 410
 411#[derive(Debug)]
 412pub struct Diff {
 413    pub multibuffer: Entity<MultiBuffer>,
 414    pub path: PathBuf,
 415    pub new_buffer: Entity<Buffer>,
 416    pub old_buffer: Entity<Buffer>,
 417    _task: Task<Result<()>>,
 418}
 419
 420impl Diff {
 421    pub fn from_acp(
 422        diff: acp_old::Diff,
 423        language_registry: Arc<LanguageRegistry>,
 424        cx: &mut App,
 425    ) -> Self {
 426        let acp_old::Diff {
 427            path,
 428            old_text,
 429            new_text,
 430        } = diff;
 431
 432        let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
 433
 434        let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
 435        let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
 436        let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
 437        let old_buffer_snapshot = old_buffer.read(cx).snapshot();
 438        let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
 439        let diff_task = buffer_diff.update(cx, |diff, cx| {
 440            diff.set_base_text(
 441                old_buffer_snapshot,
 442                Some(language_registry.clone()),
 443                new_buffer_snapshot,
 444                cx,
 445            )
 446        });
 447
 448        let task = cx.spawn({
 449            let multibuffer = multibuffer.clone();
 450            let path = path.clone();
 451            let new_buffer = new_buffer.clone();
 452            async move |cx| {
 453                diff_task.await?;
 454
 455                multibuffer
 456                    .update(cx, |multibuffer, cx| {
 457                        let hunk_ranges = {
 458                            let buffer = new_buffer.read(cx);
 459                            let diff = buffer_diff.read(cx);
 460                            diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
 461                                .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
 462                                .collect::<Vec<_>>()
 463                        };
 464
 465                        multibuffer.set_excerpts_for_path(
 466                            PathKey::for_buffer(&new_buffer, cx),
 467                            new_buffer.clone(),
 468                            hunk_ranges,
 469                            editor::DEFAULT_MULTIBUFFER_CONTEXT,
 470                            cx,
 471                        );
 472                        multibuffer.add_diff(buffer_diff.clone(), cx);
 473                    })
 474                    .log_err();
 475
 476                if let Some(language) = language_registry
 477                    .language_for_file_path(&path)
 478                    .await
 479                    .log_err()
 480                {
 481                    new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
 482                }
 483
 484                anyhow::Ok(())
 485            }
 486        });
 487
 488        Self {
 489            multibuffer,
 490            path,
 491            new_buffer,
 492            old_buffer,
 493            _task: task,
 494        }
 495    }
 496
 497    fn to_markdown(&self, cx: &App) -> String {
 498        let buffer_text = self
 499            .multibuffer
 500            .read(cx)
 501            .all_buffers()
 502            .iter()
 503            .map(|buffer| buffer.read(cx).text())
 504            .join("\n");
 505        format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
 506    }
 507}
 508
 509#[derive(Debug, Default)]
 510pub struct Plan {
 511    pub entries: Vec<PlanEntry>,
 512}
 513
 514#[derive(Debug)]
 515pub struct PlanStats<'a> {
 516    pub in_progress_entry: Option<&'a PlanEntry>,
 517    pub pending: u32,
 518    pub completed: u32,
 519}
 520
 521impl Plan {
 522    pub fn is_empty(&self) -> bool {
 523        self.entries.is_empty()
 524    }
 525
 526    pub fn stats(&self) -> PlanStats<'_> {
 527        let mut stats = PlanStats {
 528            in_progress_entry: None,
 529            pending: 0,
 530            completed: 0,
 531        };
 532
 533        for entry in &self.entries {
 534            match &entry.status {
 535                acp_old::PlanEntryStatus::Pending => {
 536                    stats.pending += 1;
 537                }
 538                acp_old::PlanEntryStatus::InProgress => {
 539                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 540                }
 541                acp_old::PlanEntryStatus::Completed => {
 542                    stats.completed += 1;
 543                }
 544            }
 545        }
 546
 547        stats
 548    }
 549}
 550
 551#[derive(Debug)]
 552pub struct PlanEntry {
 553    pub content: Entity<Markdown>,
 554    pub priority: acp_old::PlanEntryPriority,
 555    pub status: acp_old::PlanEntryStatus,
 556}
 557
 558impl PlanEntry {
 559    pub fn from_acp(entry: acp_old::PlanEntry, cx: &mut App) -> Self {
 560        Self {
 561            content: cx.new(|cx| Markdown::new_text(entry.content.into(), cx)),
 562            priority: entry.priority,
 563            status: entry.status,
 564        }
 565    }
 566}
 567
 568pub struct AcpThread {
 569    title: SharedString,
 570    entries: Vec<AgentThreadEntry>,
 571    plan: Plan,
 572    project: Entity<Project>,
 573    action_log: Entity<ActionLog>,
 574    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 575    send_task: Option<Task<()>>,
 576    connection: Arc<dyn AgentConnection>,
 577    child_status: Option<Task<Result<()>>>,
 578}
 579
 580pub enum AcpThreadEvent {
 581    NewEntry,
 582    EntryUpdated(usize),
 583}
 584
 585impl EventEmitter<AcpThreadEvent> for AcpThread {}
 586
 587#[derive(PartialEq, Eq)]
 588pub enum ThreadStatus {
 589    Idle,
 590    WaitingForToolConfirmation,
 591    Generating,
 592}
 593
 594#[derive(Debug, Clone)]
 595pub enum LoadError {
 596    Unsupported {
 597        error_message: SharedString,
 598        upgrade_message: SharedString,
 599        upgrade_command: String,
 600    },
 601    Exited(i32),
 602    Other(SharedString),
 603}
 604
 605impl Display for LoadError {
 606    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 607        match self {
 608            LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
 609            LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
 610            LoadError::Other(msg) => write!(f, "{}", msg),
 611        }
 612    }
 613}
 614
 615impl Error for LoadError {}
 616
 617impl AcpThread {
 618    pub fn new(
 619        connection: impl AgentConnection + 'static,
 620        title: SharedString,
 621        child_status: Option<Task<Result<()>>>,
 622        project: Entity<Project>,
 623        cx: &mut Context<Self>,
 624    ) -> Self {
 625        let action_log = cx.new(|_| ActionLog::new(project.clone()));
 626
 627        Self {
 628            action_log,
 629            shared_buffers: Default::default(),
 630            entries: Default::default(),
 631            plan: Default::default(),
 632            title,
 633            project,
 634            send_task: None,
 635            connection: Arc::new(connection),
 636            child_status,
 637        }
 638    }
 639
 640    /// Send a request to the agent and wait for a response.
 641    pub fn request<R: acp_old::AgentRequest + 'static>(
 642        &self,
 643        params: R,
 644    ) -> impl use<R> + Future<Output = Result<R::Response>> {
 645        let params = params.into_any();
 646        let result = self.connection.request_any(params);
 647        async move {
 648            let result = result.await?;
 649            Ok(R::response_from_any(result)?)
 650        }
 651    }
 652
 653    pub fn action_log(&self) -> &Entity<ActionLog> {
 654        &self.action_log
 655    }
 656
 657    pub fn project(&self) -> &Entity<Project> {
 658        &self.project
 659    }
 660
 661    pub fn title(&self) -> SharedString {
 662        self.title.clone()
 663    }
 664
 665    pub fn entries(&self) -> &[AgentThreadEntry] {
 666        &self.entries
 667    }
 668
 669    pub fn status(&self) -> ThreadStatus {
 670        if self.send_task.is_some() {
 671            if self.waiting_for_tool_confirmation() {
 672                ThreadStatus::WaitingForToolConfirmation
 673            } else {
 674                ThreadStatus::Generating
 675            }
 676        } else {
 677            ThreadStatus::Idle
 678        }
 679    }
 680
 681    pub fn has_pending_edit_tool_calls(&self) -> bool {
 682        for entry in self.entries.iter().rev() {
 683            match entry {
 684                AgentThreadEntry::UserMessage(_) => return false,
 685                AgentThreadEntry::ToolCall(ToolCall {
 686                    status:
 687                        ToolCallStatus::Allowed {
 688                            status: acp::ToolCallStatus::InProgress,
 689                            ..
 690                        },
 691                    content: Some(ToolCallContent::Diff { .. }),
 692                    ..
 693                }) => return true,
 694                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
 695            }
 696        }
 697
 698        false
 699    }
 700
 701    pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
 702        self.entries.push(entry);
 703        cx.emit(AcpThreadEvent::NewEntry);
 704    }
 705
 706    pub fn push_assistant_chunk(
 707        &mut self,
 708        chunk: acp::ContentBlock,
 709        is_thought: bool,
 710        cx: &mut Context<Self>,
 711    ) {
 712        let language_registry = self.project.read(cx).languages().clone();
 713        let entries_len = self.entries.len();
 714        if let Some(last_entry) = self.entries.last_mut()
 715            && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
 716        {
 717            cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
 718            match (chunks.last_mut(), is_thought) {
 719                (Some(AssistantMessageChunk::Message { block }), false)
 720                | (Some(AssistantMessageChunk::Thought { block }), true) => {
 721                    block.append(&chunk, &language_registry, cx)
 722                }
 723                _ => {
 724                    let block = ContentBlock::new(&chunk, &language_registry, cx);
 725                    if is_thought {
 726                        chunks.push(AssistantMessageChunk::Thought { block })
 727                    } else {
 728                        chunks.push(AssistantMessageChunk::Message { block })
 729                    }
 730                }
 731            }
 732        } else {
 733            let block = ContentBlock::new(&chunk, &language_registry, cx);
 734            let chunk = if is_thought {
 735                AssistantMessageChunk::Thought { block }
 736            } else {
 737                AssistantMessageChunk::Message { block }
 738            };
 739
 740            self.push_entry(
 741                AgentThreadEntry::AssistantMessage(AssistantMessage {
 742                    chunks: vec![chunk],
 743                }),
 744                cx,
 745            );
 746        }
 747    }
 748
 749    pub fn update_tool_call(
 750        &mut self,
 751        tool_call: acp::ToolCall,
 752        cx: &mut Context<Self>,
 753    ) -> Result<()> {
 754        let language_registry = self.project.read(cx).languages().clone();
 755
 756        let new_tool_call = ToolCall {
 757            id: tool_call.id,
 758            label: cx.new(|cx| {
 759                Markdown::new(
 760                    tool_call.label.into(),
 761                    Some(language_registry.clone()),
 762                    None,
 763                    cx,
 764                )
 765            }),
 766            kind: tool_call.kind,
 767            content: tool_call
 768                .content
 769                .into_iter()
 770                .map(|content| ToolCallContent::from_acp(content, language_registry, cx))
 771                .collect(),
 772            locations: tool_call.locations,
 773            status: ToolCallStatus::Allowed {
 774                status: tool_call.status,
 775            },
 776        };
 777
 778        if let Some((ix, current_call)) = self.tool_call_mut(tool_call.id) {
 779            match &mut current_call.status {
 780                ToolCallStatus::Allowed { status } => {
 781                    *status = tool_call.status;
 782                }
 783                ToolCallStatus::WaitingForConfirmation { .. } => {
 784                    anyhow::bail!("Tool call hasn't been authorized yet")
 785                }
 786                ToolCallStatus::Rejected => {
 787                    anyhow::bail!("Tool call was rejected and therefore can't be updated")
 788                }
 789                ToolCallStatus::Canceled => {
 790                    current_call.status = ToolCallStatus::Allowed { status: new_status };
 791                }
 792            }
 793
 794            *current_call = new_tool_call;
 795
 796            let location = current_call.locations.last().cloned();
 797            if let Some(location) = location {
 798                self.set_project_location(location, cx)
 799            }
 800
 801            cx.emit(AcpThreadEvent::EntryUpdated(ix));
 802        } else {
 803            let language_registry = self.project.read(cx).languages().clone();
 804            let call = ToolCall {
 805                id: tool_call.id,
 806                label: cx.new(|cx| {
 807                    Markdown::new(
 808                        tool_call.label.into(),
 809                        Some(language_registry.clone()),
 810                        None,
 811                        cx,
 812                    )
 813                }),
 814                kind: tool_call.kind,
 815                content: tool_call
 816                    .content
 817                    .into_iter()
 818                    .map(|content| ToolCallContent::from_acp(content, language_registry, cx))
 819                    .collect(),
 820                locations: tool_call.locations,
 821                status: ToolCallStatus::Allowed {
 822                    status: tool_call.status,
 823                },
 824            };
 825
 826            let location = call.locations.last().cloned();
 827            if let Some(location) = location {
 828                self.set_project_location(location, cx)
 829            }
 830
 831            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
 832        }
 833
 834        Ok(())
 835    }
 836
 837    fn tool_call_mut(&mut self, id: acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
 838        self.entries
 839            .iter_mut()
 840            .enumerate()
 841            .rev()
 842            .find_map(|(index, tool_call)| {
 843                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
 844                    && tool_call.id == id
 845                {
 846                    Some((index, tool_call))
 847                } else {
 848                    None
 849                }
 850            })
 851    }
 852
 853    pub fn request_tool_call_permission(
 854        &mut self,
 855        tool_call: acp::ToolCall,
 856        possible_grants: Vec<acp::Grant>,
 857        cx: &mut Context<Self>,
 858    ) -> ToolCallRequest {
 859        let (tx, rx) = oneshot::channel();
 860
 861        let status = ToolCallStatus::WaitingForConfirmation {
 862            possible_grants,
 863            respond_tx: tx,
 864        };
 865
 866        let id = self.insert_tool_call(tool_call, status, cx);
 867        ToolCallRequest { id, outcome: rx }
 868    }
 869
 870    pub fn request_tool_call_confirmation(
 871        &mut self,
 872        tool_call_id: ToolCallId,
 873        confirmation: acp_old::ToolCallConfirmation,
 874        cx: &mut Context<Self>,
 875    ) -> Result<ToolCallRequest> {
 876        let project = self.project.read(cx).languages().clone();
 877        let Some((idx, call)) = self.tool_call_mut(tool_call_id) else {
 878            anyhow::bail!("Tool call not found");
 879        };
 880
 881        let (tx, rx) = oneshot::channel();
 882
 883        call.status = ToolCallStatus::WaitingForConfirmation {
 884            confirmation: ToolCallConfirmation::from_acp(confirmation, project, cx),
 885            respond_tx: tx,
 886        };
 887
 888        cx.emit(AcpThreadEvent::EntryUpdated(idx));
 889
 890        Ok(ToolCallRequest {
 891            id: tool_call_id,
 892            outcome: rx,
 893        })
 894    }
 895
 896    fn insert_tool_call(
 897        &mut self,
 898        tool_call: acp_old::PushToolCallParams,
 899        status: ToolCallStatus,
 900        cx: &mut Context<Self>,
 901    ) -> acp_old::ToolCallId {
 902        let language_registry = self.project.read(cx).languages().clone();
 903        let id = acp_old::ToolCallId(self.entries.len() as u64);
 904        let call = ToolCall {
 905            id,
 906            label: cx.new(|cx| {
 907                Markdown::new(
 908                    tool_call.label.into(),
 909                    Some(language_registry.clone()),
 910                    None,
 911                    cx,
 912                )
 913            }),
 914            icon: acp_icon_to_ui_icon(tool_call.icon),
 915            content: tool_call
 916                .content
 917                .map(|content| ToolCallContent::from_acp(content, language_registry, cx)),
 918            locations: tool_call.locations,
 919            status,
 920        };
 921
 922        let location = call.locations.last().cloned();
 923        if let Some(location) = location {
 924            self.set_project_location(location, cx)
 925        }
 926
 927        self.push_entry(AgentThreadEntry::ToolCall(call), cx);
 928
 929        id
 930    }
 931
 932    pub fn authorize_tool_call(
 933        &mut self,
 934        id: acp::ToolCallId,
 935        grant: acp::Grant,
 936        cx: &mut Context<Self>,
 937    ) {
 938        let Some((ix, call)) = self.tool_call_mut(id) else {
 939            return;
 940        };
 941
 942        let new_status = if grant.is_allowed {
 943            ToolCallStatus::Allowed {
 944                status: acp::ToolCallStatus::InProgress,
 945            }
 946        } else {
 947            ToolCallStatus::Rejected
 948        };
 949
 950        let curr_status = mem::replace(&mut call.status, new_status);
 951
 952        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
 953            respond_tx.send(grant.id).log_err();
 954        } else if cfg!(debug_assertions) {
 955            panic!("tried to authorize an already authorized tool call");
 956        }
 957
 958        cx.emit(AcpThreadEvent::EntryUpdated(ix));
 959    }
 960
 961    pub fn plan(&self) -> &Plan {
 962        &self.plan
 963    }
 964
 965    pub fn update_plan(&mut self, request: acp_old::UpdatePlanParams, cx: &mut Context<Self>) {
 966        self.plan = Plan {
 967            entries: request
 968                .entries
 969                .into_iter()
 970                .map(|entry| PlanEntry::from_acp(entry, cx))
 971                .collect(),
 972        };
 973
 974        cx.notify();
 975    }
 976
 977    pub fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
 978        self.plan
 979            .entries
 980            .retain(|entry| !matches!(entry.status, acp_old::PlanEntryStatus::Completed));
 981        cx.notify();
 982    }
 983
 984    pub fn set_project_location(&self, location: acp::ToolCallLocation, cx: &mut Context<Self>) {
 985        self.project.update(cx, |project, cx| {
 986            let Some(path) = project.project_path_for_absolute_path(&location.path, cx) else {
 987                return;
 988            };
 989            let buffer = project.open_buffer(path, cx);
 990            cx.spawn(async move |project, cx| {
 991                let buffer = buffer.await?;
 992
 993                project.update(cx, |project, cx| {
 994                    let position = if let Some(line) = location.line {
 995                        let snapshot = buffer.read(cx).snapshot();
 996                        let point = snapshot.clip_point(Point::new(line, 0), Bias::Left);
 997                        snapshot.anchor_before(point)
 998                    } else {
 999                        Anchor::MIN
1000                    };
1001
1002                    project.set_agent_location(
1003                        Some(AgentLocation {
1004                            buffer: buffer.downgrade(),
1005                            position,
1006                        }),
1007                        cx,
1008                    );
1009                })
1010            })
1011            .detach_and_log_err(cx);
1012        });
1013    }
1014
1015    /// Returns true if the last turn is awaiting tool authorization
1016    pub fn waiting_for_tool_confirmation(&self) -> bool {
1017        for entry in self.entries.iter().rev() {
1018            match &entry {
1019                AgentThreadEntry::ToolCall(call) => match call.status {
1020                    ToolCallStatus::WaitingForConfirmation { .. } => return true,
1021                    ToolCallStatus::Allowed { .. }
1022                    | ToolCallStatus::Rejected
1023                    | ToolCallStatus::Canceled => continue,
1024                },
1025                AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1026                    // Reached the beginning of the turn
1027                    return false;
1028                }
1029            }
1030        }
1031        false
1032    }
1033
1034    pub fn initialize(&self) -> impl use<> + Future<Output = Result<acp_old::InitializeResponse>> {
1035        self.request(acp_old::InitializeParams {
1036            protocol_version: acp_old::ProtocolVersion::latest(),
1037        })
1038    }
1039
1040    pub fn authenticate(&self) -> impl use<> + Future<Output = Result<()>> {
1041        self.request(acp_old::AuthenticateParams)
1042    }
1043
1044    #[cfg(any(test, feature = "test-support"))]
1045    pub fn send_raw(
1046        &mut self,
1047        message: &str,
1048        cx: &mut Context<Self>,
1049    ) -> BoxFuture<'static, Result<(), acp_old::Error>> {
1050        self.send(
1051            vec![acp::ContentBlock::Text(acp::TextContent {
1052                text: message.to_string(),
1053                annotations: None,
1054            })],
1055            cx,
1056        )
1057    }
1058
1059    pub fn send(
1060        &mut self,
1061        message: Vec<acp::ContentBlock>,
1062        cx: &mut Context<Self>,
1063    ) -> BoxFuture<'static, Result<(), acp_old::Error>> {
1064        let block = ContentBlock::new_combined(&message, self.project.read(cx).languages(), cx);
1065        self.push_entry(
1066            AgentThreadEntry::UserMessage(UserMessage { content: block }),
1067            cx,
1068        );
1069
1070        let (tx, rx) = oneshot::channel();
1071        let cancel = self.cancel(cx);
1072
1073        self.send_task = Some(cx.spawn(async move |this, cx| {
1074            async {
1075                cancel.await.log_err();
1076
1077                let result = this.update(cx, |this, _| this.request(message))?.await;
1078                tx.send(result).log_err();
1079                this.update(cx, |this, _cx| this.send_task.take())?;
1080                anyhow::Ok(())
1081            }
1082            .await
1083            .log_err();
1084        }));
1085
1086        async move {
1087            match rx.await {
1088                Ok(Err(e)) => Err(e)?,
1089                _ => Ok(()),
1090            }
1091        }
1092        .boxed()
1093    }
1094
1095    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<Result<(), acp_old::Error>> {
1096        if self.send_task.take().is_some() {
1097            let request = self.request(acp_old::CancelSendMessageParams);
1098            cx.spawn(async move |this, cx| {
1099                request.await?;
1100                this.update(cx, |this, _cx| {
1101                    for entry in this.entries.iter_mut() {
1102                        if let AgentThreadEntry::ToolCall(call) = entry {
1103                            let cancel = matches!(
1104                                call.status,
1105                                ToolCallStatus::WaitingForConfirmation { .. }
1106                                    | ToolCallStatus::Allowed {
1107                                        status: acp_old::ToolCallStatus::Running
1108                                    }
1109                            );
1110
1111                            if cancel {
1112                                let curr_status =
1113                                    mem::replace(&mut call.status, ToolCallStatus::Canceled);
1114
1115                                if let ToolCallStatus::WaitingForConfirmation {
1116                                    respond_tx, ..
1117                                } = curr_status
1118                                {
1119                                    respond_tx
1120                                        .send(acp_old::ToolCallConfirmationOutcome::Cancel)
1121                                        .ok();
1122                                }
1123                            }
1124                        }
1125                    }
1126                })?;
1127                Ok(())
1128            })
1129        } else {
1130            Task::ready(Ok(()))
1131        }
1132    }
1133
1134    pub fn read_text_file(
1135        &self,
1136        request: acp_old::ReadTextFileParams,
1137        reuse_shared_snapshot: bool,
1138        cx: &mut Context<Self>,
1139    ) -> Task<Result<String>> {
1140        let project = self.project.clone();
1141        let action_log = self.action_log.clone();
1142        cx.spawn(async move |this, cx| {
1143            let load = project.update(cx, |project, cx| {
1144                let path = project
1145                    .project_path_for_absolute_path(&request.path, cx)
1146                    .context("invalid path")?;
1147                anyhow::Ok(project.open_buffer(path, cx))
1148            });
1149            let buffer = load??.await?;
1150
1151            let snapshot = if reuse_shared_snapshot {
1152                this.read_with(cx, |this, _| {
1153                    this.shared_buffers.get(&buffer.clone()).cloned()
1154                })
1155                .log_err()
1156                .flatten()
1157            } else {
1158                None
1159            };
1160
1161            let snapshot = if let Some(snapshot) = snapshot {
1162                snapshot
1163            } else {
1164                action_log.update(cx, |action_log, cx| {
1165                    action_log.buffer_read(buffer.clone(), cx);
1166                })?;
1167                project.update(cx, |project, cx| {
1168                    let position = buffer
1169                        .read(cx)
1170                        .snapshot()
1171                        .anchor_before(Point::new(request.line.unwrap_or_default(), 0));
1172                    project.set_agent_location(
1173                        Some(AgentLocation {
1174                            buffer: buffer.downgrade(),
1175                            position,
1176                        }),
1177                        cx,
1178                    );
1179                })?;
1180
1181                buffer.update(cx, |buffer, _| buffer.snapshot())?
1182            };
1183
1184            this.update(cx, |this, _| {
1185                let text = snapshot.text();
1186                this.shared_buffers.insert(buffer.clone(), snapshot);
1187                if request.line.is_none() && request.limit.is_none() {
1188                    return Ok(text);
1189                }
1190                let limit = request.limit.unwrap_or(u32::MAX) as usize;
1191                let Some(line) = request.line else {
1192                    return Ok(text.lines().take(limit).collect::<String>());
1193                };
1194
1195                let count = text.lines().count();
1196                if count < line as usize {
1197                    anyhow::bail!("There are only {} lines", count);
1198                }
1199                Ok(text
1200                    .lines()
1201                    .skip(line as usize + 1)
1202                    .take(limit)
1203                    .collect::<String>())
1204            })?
1205        })
1206    }
1207
1208    pub fn write_text_file(
1209        &self,
1210        path: PathBuf,
1211        content: String,
1212        cx: &mut Context<Self>,
1213    ) -> Task<Result<()>> {
1214        let project = self.project.clone();
1215        let action_log = self.action_log.clone();
1216        cx.spawn(async move |this, cx| {
1217            let load = project.update(cx, |project, cx| {
1218                let path = project
1219                    .project_path_for_absolute_path(&path, cx)
1220                    .context("invalid path")?;
1221                anyhow::Ok(project.open_buffer(path, cx))
1222            });
1223            let buffer = load??.await?;
1224            let snapshot = this.update(cx, |this, cx| {
1225                this.shared_buffers
1226                    .get(&buffer)
1227                    .cloned()
1228                    .unwrap_or_else(|| buffer.read(cx).snapshot())
1229            })?;
1230            let edits = cx
1231                .background_executor()
1232                .spawn(async move {
1233                    let old_text = snapshot.text();
1234                    text_diff(old_text.as_str(), &content)
1235                        .into_iter()
1236                        .map(|(range, replacement)| {
1237                            (
1238                                snapshot.anchor_after(range.start)
1239                                    ..snapshot.anchor_before(range.end),
1240                                replacement,
1241                            )
1242                        })
1243                        .collect::<Vec<_>>()
1244                })
1245                .await;
1246            cx.update(|cx| {
1247                project.update(cx, |project, cx| {
1248                    project.set_agent_location(
1249                        Some(AgentLocation {
1250                            buffer: buffer.downgrade(),
1251                            position: edits
1252                                .last()
1253                                .map(|(range, _)| range.end)
1254                                .unwrap_or(Anchor::MIN),
1255                        }),
1256                        cx,
1257                    );
1258                });
1259
1260                action_log.update(cx, |action_log, cx| {
1261                    action_log.buffer_read(buffer.clone(), cx);
1262                });
1263                buffer.update(cx, |buffer, cx| {
1264                    buffer.edit(edits, None, cx);
1265                });
1266                action_log.update(cx, |action_log, cx| {
1267                    action_log.buffer_edited(buffer.clone(), cx);
1268                });
1269            })?;
1270            project
1271                .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1272                .await
1273        })
1274    }
1275
1276    pub fn child_status(&mut self) -> Option<Task<Result<()>>> {
1277        self.child_status.take()
1278    }
1279
1280    pub fn to_markdown(&self, cx: &App) -> String {
1281        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1282    }
1283}
1284
1285#[derive(Clone)]
1286pub struct OldAcpClientDelegate {
1287    thread: WeakEntity<AcpThread>,
1288    cx: AsyncApp,
1289    // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
1290}
1291
1292impl OldAcpClientDelegate {
1293    pub fn new(thread: WeakEntity<AcpThread>, cx: AsyncApp) -> Self {
1294        Self { thread, cx }
1295    }
1296
1297    pub async fn clear_completed_plan_entries(&self) -> Result<()> {
1298        let cx = &mut self.cx.clone();
1299        cx.update(|cx| {
1300            self.thread
1301                .update(cx, |thread, cx| thread.clear_completed_plan_entries(cx))
1302        })?
1303        .context("Failed to update thread")?;
1304
1305        Ok(())
1306    }
1307
1308    pub async fn request_existing_tool_call_confirmation(
1309        &self,
1310        tool_call_id: ToolCallId,
1311        confirmation: acp_old::ToolCallConfirmation,
1312    ) -> Result<acp_old::ToolCallConfirmationOutcome> {
1313        let cx = &mut self.cx.clone();
1314        let ToolCallRequest { outcome, .. } = cx
1315            .update(|cx| {
1316                self.thread.update(cx, |thread, cx| {
1317                    thread.request_tool_call_confirmation(tool_call_id, confirmation, cx)
1318                })
1319            })?
1320            .context("Failed to update thread")??;
1321
1322        Ok(outcome.await?)
1323    }
1324
1325    pub async fn read_text_file_reusing_snapshot(
1326        &self,
1327        request: acp_old::ReadTextFileParams,
1328    ) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
1329        let content = self
1330            .cx
1331            .update(|cx| {
1332                self.thread
1333                    .update(cx, |thread, cx| thread.read_text_file(request, true, cx))
1334            })?
1335            .context("Failed to update thread")?
1336            .await?;
1337        Ok(acp_old::ReadTextFileResponse { content })
1338    }
1339}
1340
1341impl acp_old::Client for OldAcpClientDelegate {
1342    async fn stream_assistant_message_chunk(
1343        &self,
1344        params: acp_old::StreamAssistantMessageChunkParams,
1345    ) -> Result<(), acp_old::Error> {
1346        let cx = &mut self.cx.clone();
1347
1348        cx.update(|cx| {
1349            self.thread
1350                .update(cx, |thread, cx| {
1351                    thread.push_assistant_chunk(params.chunk, cx)
1352                })
1353                .ok();
1354        })?;
1355
1356        Ok(())
1357    }
1358
1359    async fn request_tool_call_confirmation(
1360        &self,
1361        request: acp_old::RequestToolCallConfirmationParams,
1362    ) -> Result<acp_old::RequestToolCallConfirmationResponse, acp_old::Error> {
1363        let cx = &mut self.cx.clone();
1364        let ToolCallRequest { id, outcome } = cx
1365            .update(|cx| {
1366                self.thread
1367                    .update(cx, |thread, cx| thread.request_new_tool_call(request, cx))
1368            })?
1369            .context("Failed to update thread")?;
1370
1371        Ok(acp_old::RequestToolCallConfirmationResponse {
1372            id,
1373            outcome: outcome.await.map_err(acp_old::Error::into_internal_error)?,
1374        })
1375    }
1376
1377    async fn push_tool_call(
1378        &self,
1379        request: acp_old::PushToolCallParams,
1380    ) -> Result<acp_old::PushToolCallResponse, acp_old::Error> {
1381        let cx = &mut self.cx.clone();
1382        let id = cx
1383            .update(|cx| {
1384                self.thread
1385                    .update(cx, |thread, cx| thread.push_tool_call(request, cx))
1386            })?
1387            .context("Failed to update thread")?;
1388
1389        Ok(acp_old::PushToolCallResponse { id })
1390    }
1391
1392    async fn update_tool_call(
1393        &self,
1394        request: acp_old::UpdateToolCallParams,
1395    ) -> Result<(), acp_old::Error> {
1396        let cx = &mut self.cx.clone();
1397
1398        cx.update(|cx| {
1399            self.thread.update(cx, |thread, cx| {
1400                thread.update_tool_call(request.tool_call_id, request.status, request.content, cx)
1401            })
1402        })?
1403        .context("Failed to update thread")??;
1404
1405        Ok(())
1406    }
1407
1408    async fn update_plan(&self, request: acp_old::UpdatePlanParams) -> Result<(), acp_old::Error> {
1409        let cx = &mut self.cx.clone();
1410
1411        cx.update(|cx| {
1412            self.thread
1413                .update(cx, |thread, cx| thread.update_plan(request, cx))
1414        })?
1415        .context("Failed to update thread")?;
1416
1417        Ok(())
1418    }
1419
1420    async fn read_text_file(
1421        &self,
1422        request: acp_old::ReadTextFileParams,
1423    ) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
1424        let content = self
1425            .cx
1426            .update(|cx| {
1427                self.thread
1428                    .update(cx, |thread, cx| thread.read_text_file(request, false, cx))
1429            })?
1430            .context("Failed to update thread")?
1431            .await?;
1432        Ok(acp_old::ReadTextFileResponse { content })
1433    }
1434
1435    async fn write_text_file(
1436        &self,
1437        request: acp_old::WriteTextFileParams,
1438    ) -> Result<(), acp_old::Error> {
1439        self.cx
1440            .update(|cx| {
1441                self.thread.update(cx, |thread, cx| {
1442                    thread.write_text_file(request.path, request.content, cx)
1443                })
1444            })?
1445            .context("Failed to update thread")?
1446            .await?;
1447
1448        Ok(())
1449    }
1450}
1451
1452fn acp_icon_to_ui_icon(icon: acp_old::Icon) -> IconName {
1453    match icon {
1454        acp_old::Icon::FileSearch => IconName::ToolSearch,
1455        acp_old::Icon::Folder => IconName::ToolFolder,
1456        acp_old::Icon::Globe => IconName::ToolWeb,
1457        acp_old::Icon::Hammer => IconName::ToolHammer,
1458        acp_old::Icon::LightBulb => IconName::ToolBulb,
1459        acp_old::Icon::Pencil => IconName::ToolPencil,
1460        acp_old::Icon::Regex => IconName::ToolRegex,
1461        acp_old::Icon::Terminal => IconName::ToolTerminal,
1462    }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467    use super::*;
1468    use anyhow::anyhow;
1469    use async_pipe::{PipeReader, PipeWriter};
1470    use futures::{channel::mpsc, future::LocalBoxFuture, select};
1471    use gpui::{AsyncApp, TestAppContext};
1472    use indoc::indoc;
1473    use project::FakeFs;
1474    use serde_json::json;
1475    use settings::SettingsStore;
1476    use smol::{future::BoxedLocal, stream::StreamExt as _};
1477    use std::{cell::RefCell, rc::Rc, time::Duration};
1478    use util::path;
1479
1480    fn init_test(cx: &mut TestAppContext) {
1481        env_logger::try_init().ok();
1482        cx.update(|cx| {
1483            let settings_store = SettingsStore::test(cx);
1484            cx.set_global(settings_store);
1485            Project::init_settings(cx);
1486            language::init(cx);
1487        });
1488    }
1489
1490    #[gpui::test]
1491    async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1492        init_test(cx);
1493
1494        let fs = FakeFs::new(cx.executor());
1495        let project = Project::test(fs, [], cx).await;
1496        let (thread, fake_server) = fake_acp_thread(project, cx);
1497
1498        fake_server.update(cx, |fake_server, _| {
1499            fake_server.on_user_message(move |_, server, mut cx| async move {
1500                server
1501                    .update(&mut cx, |server, _| {
1502                        server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1503                            chunk: acp_old::AssistantMessageChunk::Thought {
1504                                thought: "Thinking ".into(),
1505                            },
1506                        })
1507                    })?
1508                    .await
1509                    .unwrap();
1510                server
1511                    .update(&mut cx, |server, _| {
1512                        server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1513                            chunk: acp_old::AssistantMessageChunk::Thought {
1514                                thought: "hard!".into(),
1515                            },
1516                        })
1517                    })?
1518                    .await
1519                    .unwrap();
1520
1521                Ok(())
1522            })
1523        });
1524
1525        thread
1526            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1527            .await
1528            .unwrap();
1529
1530        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1531        assert_eq!(
1532            output,
1533            indoc! {r#"
1534            ## User
1535
1536            Hello from Zed!
1537
1538            ## Assistant
1539
1540            <thinking>
1541            Thinking hard!
1542            </thinking>
1543
1544            "#}
1545        );
1546    }
1547
1548    #[gpui::test]
1549    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1550        init_test(cx);
1551
1552        let fs = FakeFs::new(cx.executor());
1553        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1554            .await;
1555        let project = Project::test(fs.clone(), [], cx).await;
1556        let (thread, fake_server) = fake_acp_thread(project.clone(), cx);
1557        let (worktree, pathbuf) = project
1558            .update(cx, |project, cx| {
1559                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1560            })
1561            .await
1562            .unwrap();
1563        let buffer = project
1564            .update(cx, |project, cx| {
1565                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1566            })
1567            .await
1568            .unwrap();
1569
1570        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1571        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1572
1573        fake_server.update(cx, |fake_server, _| {
1574            fake_server.on_user_message(move |_, server, mut cx| {
1575                let read_file_tx = read_file_tx.clone();
1576                async move {
1577                    let content = server
1578                        .update(&mut cx, |server, _| {
1579                            server.send_to_zed(acp_old::ReadTextFileParams {
1580                                path: path!("/tmp/foo").into(),
1581                                line: None,
1582                                limit: None,
1583                            })
1584                        })?
1585                        .await
1586                        .unwrap();
1587                    assert_eq!(content.content, "one\ntwo\nthree\n");
1588                    read_file_tx.take().unwrap().send(()).unwrap();
1589                    server
1590                        .update(&mut cx, |server, _| {
1591                            server.send_to_zed(acp_old::WriteTextFileParams {
1592                                path: path!("/tmp/foo").into(),
1593                                content: "one\ntwo\nthree\nfour\nfive\n".to_string(),
1594                            })
1595                        })?
1596                        .await
1597                        .unwrap();
1598                    Ok(())
1599                }
1600            })
1601        });
1602
1603        let request = thread.update(cx, |thread, cx| {
1604            thread.send_raw("Extend the count in /tmp/foo", cx)
1605        });
1606        read_file_rx.await.ok();
1607        buffer.update(cx, |buffer, cx| {
1608            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1609        });
1610        cx.run_until_parked();
1611        assert_eq!(
1612            buffer.read_with(cx, |buffer, _| buffer.text()),
1613            "zero\none\ntwo\nthree\nfour\nfive\n"
1614        );
1615        assert_eq!(
1616            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1617            "zero\none\ntwo\nthree\nfour\nfive\n"
1618        );
1619        request.await.unwrap();
1620    }
1621
1622    #[gpui::test]
1623    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1624        init_test(cx);
1625
1626        let fs = FakeFs::new(cx.executor());
1627        let project = Project::test(fs, [], cx).await;
1628        let (thread, fake_server) = fake_acp_thread(project, cx);
1629
1630        let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1631
1632        let tool_call_id = Rc::new(RefCell::new(None));
1633        let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1634        fake_server.update(cx, |fake_server, _| {
1635            let tool_call_id = tool_call_id.clone();
1636            fake_server.on_user_message(move |_, server, mut cx| {
1637                let end_turn_rx = end_turn_rx.clone();
1638                let tool_call_id = tool_call_id.clone();
1639                async move {
1640                    let tool_call_result = server
1641                        .update(&mut cx, |server, _| {
1642                            server.send_to_zed(acp_old::PushToolCallParams {
1643                                label: "Fetch".to_string(),
1644                                icon: acp_old::Icon::Globe,
1645                                content: None,
1646                                locations: vec![],
1647                            })
1648                        })?
1649                        .await
1650                        .unwrap();
1651                    *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1652                    end_turn_rx.take().unwrap().await.ok();
1653
1654                    Ok(())
1655                }
1656            })
1657        });
1658
1659        let request = thread.update(cx, |thread, cx| {
1660            thread.send_raw("Fetch https://example.com", cx)
1661        });
1662
1663        run_until_first_tool_call(&thread, cx).await;
1664
1665        thread.read_with(cx, |thread, _| {
1666            assert!(matches!(
1667                thread.entries[1],
1668                AgentThreadEntry::ToolCall(ToolCall {
1669                    status: ToolCallStatus::Allowed {
1670                        status: acp_old::ToolCallStatus::Running,
1671                        ..
1672                    },
1673                    ..
1674                })
1675            ));
1676        });
1677
1678        cx.run_until_parked();
1679
1680        thread
1681            .update(cx, |thread, cx| thread.cancel(cx))
1682            .await
1683            .unwrap();
1684
1685        thread.read_with(cx, |thread, _| {
1686            assert!(matches!(
1687                &thread.entries[1],
1688                AgentThreadEntry::ToolCall(ToolCall {
1689                    status: ToolCallStatus::Canceled,
1690                    ..
1691                })
1692            ));
1693        });
1694
1695        fake_server
1696            .update(cx, |fake_server, _| {
1697                fake_server.send_to_zed(acp_old::UpdateToolCallParams {
1698                    tool_call_id: tool_call_id.borrow().unwrap(),
1699                    status: acp_old::ToolCallStatus::Finished,
1700                    content: None,
1701                })
1702            })
1703            .await
1704            .unwrap();
1705
1706        drop(end_turn_tx);
1707        request.await.unwrap();
1708
1709        thread.read_with(cx, |thread, _| {
1710            assert!(matches!(
1711                thread.entries[1],
1712                AgentThreadEntry::ToolCall(ToolCall {
1713                    status: ToolCallStatus::Allowed {
1714                        status: acp_old::ToolCallStatus::Finished,
1715                        ..
1716                    },
1717                    ..
1718                })
1719            ));
1720        });
1721    }
1722
1723    async fn run_until_first_tool_call(
1724        thread: &Entity<AcpThread>,
1725        cx: &mut TestAppContext,
1726    ) -> usize {
1727        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1728
1729        let subscription = cx.update(|cx| {
1730            cx.subscribe(thread, move |thread, _, cx| {
1731                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1732                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1733                        return tx.try_send(ix).unwrap();
1734                    }
1735                }
1736            })
1737        });
1738
1739        select! {
1740            _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1741                panic!("Timeout waiting for tool call")
1742            }
1743            ix = rx.next().fuse() => {
1744                drop(subscription);
1745                ix.unwrap()
1746            }
1747        }
1748    }
1749
1750    pub fn fake_acp_thread(
1751        project: Entity<Project>,
1752        cx: &mut TestAppContext,
1753    ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1754        let (stdin_tx, stdin_rx) = async_pipe::pipe();
1755        let (stdout_tx, stdout_rx) = async_pipe::pipe();
1756
1757        let thread = cx.new(|cx| {
1758            let foreground_executor = cx.foreground_executor().clone();
1759            let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent(
1760                OldAcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
1761                stdin_tx,
1762                stdout_rx,
1763                move |fut| {
1764                    foreground_executor.spawn(fut).detach();
1765                },
1766            );
1767
1768            let io_task = cx.background_spawn({
1769                async move {
1770                    io_fut.await.log_err();
1771                    Ok(())
1772                }
1773            });
1774            AcpThread::new(connection, "Test".into(), Some(io_task), project, cx)
1775        });
1776        let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1777        (thread, agent)
1778    }
1779
1780    pub struct FakeAcpServer {
1781        connection: acp_old::ClientConnection,
1782
1783        _io_task: Task<()>,
1784        on_user_message: Option<
1785            Rc<
1786                dyn Fn(
1787                    acp_old::SendUserMessageParams,
1788                    Entity<FakeAcpServer>,
1789                    AsyncApp,
1790                ) -> LocalBoxFuture<'static, Result<(), acp_old::Error>>,
1791            >,
1792        >,
1793    }
1794
1795    #[derive(Clone)]
1796    struct FakeAgent {
1797        server: Entity<FakeAcpServer>,
1798        cx: AsyncApp,
1799    }
1800
1801    impl acp_old::Agent for FakeAgent {
1802        async fn initialize(
1803            &self,
1804            params: acp_old::InitializeParams,
1805        ) -> Result<acp_old::InitializeResponse, acp_old::Error> {
1806            Ok(acp_old::InitializeResponse {
1807                protocol_version: params.protocol_version,
1808                is_authenticated: true,
1809            })
1810        }
1811
1812        async fn authenticate(&self) -> Result<(), acp_old::Error> {
1813            Ok(())
1814        }
1815
1816        async fn cancel_send_message(&self) -> Result<(), acp_old::Error> {
1817            Ok(())
1818        }
1819
1820        async fn send_user_message(
1821            &self,
1822            request: acp_old::SendUserMessageParams,
1823        ) -> Result<(), acp_old::Error> {
1824            let mut cx = self.cx.clone();
1825            let handler = self
1826                .server
1827                .update(&mut cx, |server, _| server.on_user_message.clone())
1828                .ok()
1829                .flatten();
1830            if let Some(handler) = handler {
1831                handler(request, self.server.clone(), self.cx.clone()).await
1832            } else {
1833                Err(anyhow::anyhow!("No handler for on_user_message").into())
1834            }
1835        }
1836    }
1837
1838    impl FakeAcpServer {
1839        fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1840            let agent = FakeAgent {
1841                server: cx.entity(),
1842                cx: cx.to_async(),
1843            };
1844            let foreground_executor = cx.foreground_executor().clone();
1845
1846            let (connection, io_fut) = acp_old::ClientConnection::connect_to_client(
1847                agent.clone(),
1848                stdout,
1849                stdin,
1850                move |fut| {
1851                    foreground_executor.spawn(fut).detach();
1852                },
1853            );
1854            FakeAcpServer {
1855                connection: connection,
1856                on_user_message: None,
1857                _io_task: cx.background_spawn(async move {
1858                    io_fut.await.log_err();
1859                }),
1860            }
1861        }
1862
1863        fn on_user_message<F>(
1864            &mut self,
1865            handler: impl for<'a> Fn(
1866                acp_old::SendUserMessageParams,
1867                Entity<FakeAcpServer>,
1868                AsyncApp,
1869            ) -> F
1870            + 'static,
1871        ) where
1872            F: Future<Output = Result<(), acp_old::Error>> + 'static,
1873        {
1874            self.on_user_message
1875                .replace(Rc::new(move |request, server, cx| {
1876                    handler(request, server, cx).boxed_local()
1877                }));
1878        }
1879
1880        fn send_to_zed<T: acp_old::ClientRequest + 'static>(
1881            &self,
1882            message: T,
1883        ) -> BoxedLocal<Result<T::Response>> {
1884            self.connection
1885                .request(message)
1886                .map(|f| f.map_err(|err| anyhow!(err)))
1887                .boxed_local()
1888        }
1889    }
1890}