1pub use acp::ToolCallId;
2use agent_servers::AgentServer;
3use agentic_coding_protocol::{self as acp, UserMessageChunk};
4use anyhow::{Context as _, Result, anyhow};
5use buffer_diff::BufferDiff;
6use editor::{MultiBuffer, PathKey};
7use futures::{FutureExt, channel::oneshot, future::BoxFuture};
8use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
9use itertools::Itertools;
10use language::{Anchor, Buffer, Capability, LanguageRegistry, OffsetRangeExt as _};
11use markdown::Markdown;
12use project::Project;
13use std::error::Error;
14use std::fmt::{Formatter, Write};
15use std::{
16 fmt::Display,
17 mem,
18 path::{Path, PathBuf},
19 sync::Arc,
20};
21use ui::{App, IconName};
22use util::ResultExt;
23
24#[derive(Clone, Debug, Eq, PartialEq)]
25pub struct UserMessage {
26 pub content: Entity<Markdown>,
27}
28
29impl UserMessage {
30 pub fn from_acp(
31 message: &acp::SendUserMessageParams,
32 language_registry: Arc<LanguageRegistry>,
33 cx: &mut App,
34 ) -> Self {
35 let mut md_source = String::new();
36
37 for chunk in &message.chunks {
38 match chunk {
39 UserMessageChunk::Text { text } => md_source.push_str(&text),
40 UserMessageChunk::Path { path } => {
41 write!(&mut md_source, "{}", MentionPath(&path)).unwrap()
42 }
43 }
44 }
45
46 Self {
47 content: cx
48 .new(|cx| Markdown::new(md_source.into(), Some(language_registry), None, cx)),
49 }
50 }
51
52 fn to_markdown(&self, cx: &App) -> String {
53 format!("## User\n\n{}\n\n", self.content.read(cx).source())
54 }
55}
56
57#[derive(Debug)]
58pub struct MentionPath<'a>(&'a Path);
59
60impl<'a> MentionPath<'a> {
61 const PREFIX: &'static str = "@file:";
62
63 pub fn new(path: &'a Path) -> Self {
64 MentionPath(path)
65 }
66
67 pub fn try_parse(url: &'a str) -> Option<Self> {
68 let path = url.strip_prefix(Self::PREFIX)?;
69 Some(MentionPath(Path::new(path)))
70 }
71
72 pub fn path(&self) -> &Path {
73 self.0
74 }
75}
76
77impl Display for MentionPath<'_> {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "[@{}]({}{})",
82 self.0.file_name().unwrap_or_default().display(),
83 Self::PREFIX,
84 self.0.display()
85 )
86 }
87}
88
89#[derive(Clone, Debug, Eq, PartialEq)]
90pub struct AssistantMessage {
91 pub chunks: Vec<AssistantMessageChunk>,
92}
93
94impl AssistantMessage {
95 fn to_markdown(&self, cx: &App) -> String {
96 format!(
97 "## Assistant\n\n{}\n\n",
98 self.chunks
99 .iter()
100 .map(|chunk| chunk.to_markdown(cx))
101 .join("\n\n")
102 )
103 }
104}
105
106#[derive(Clone, Debug, Eq, PartialEq)]
107pub enum AssistantMessageChunk {
108 Text { chunk: Entity<Markdown> },
109 Thought { chunk: Entity<Markdown> },
110}
111
112impl AssistantMessageChunk {
113 pub fn from_acp(
114 chunk: acp::AssistantMessageChunk,
115 language_registry: Arc<LanguageRegistry>,
116 cx: &mut App,
117 ) -> Self {
118 match chunk {
119 acp::AssistantMessageChunk::Text { text } => Self::Text {
120 chunk: cx.new(|cx| Markdown::new(text.into(), Some(language_registry), None, cx)),
121 },
122 acp::AssistantMessageChunk::Thought { thought } => Self::Thought {
123 chunk: cx
124 .new(|cx| Markdown::new(thought.into(), Some(language_registry), None, cx)),
125 },
126 }
127 }
128
129 pub fn from_str(chunk: &str, language_registry: Arc<LanguageRegistry>, cx: &mut App) -> Self {
130 Self::Text {
131 chunk: cx.new(|cx| {
132 Markdown::new(chunk.to_owned().into(), Some(language_registry), None, cx)
133 }),
134 }
135 }
136
137 fn to_markdown(&self, cx: &App) -> String {
138 match self {
139 Self::Text { chunk } => chunk.read(cx).source().to_string(),
140 Self::Thought { chunk } => {
141 format!("<thinking>\n{}\n</thinking>", chunk.read(cx).source())
142 }
143 }
144 }
145}
146
147#[derive(Debug)]
148pub enum AgentThreadEntry {
149 UserMessage(UserMessage),
150 AssistantMessage(AssistantMessage),
151 ToolCall(ToolCall),
152}
153
154impl AgentThreadEntry {
155 fn to_markdown(&self, cx: &App) -> String {
156 match self {
157 Self::UserMessage(message) => message.to_markdown(cx),
158 Self::AssistantMessage(message) => message.to_markdown(cx),
159 Self::ToolCall(too_call) => too_call.to_markdown(cx),
160 }
161 }
162}
163
164#[derive(Debug)]
165pub struct ToolCall {
166 pub id: acp::ToolCallId,
167 pub label: Entity<Markdown>,
168 pub icon: IconName,
169 pub content: Option<ToolCallContent>,
170 pub status: ToolCallStatus,
171}
172
173impl ToolCall {
174 fn to_markdown(&self, cx: &App) -> String {
175 let mut markdown = format!(
176 "**Tool Call: {}**\nStatus: {}\n\n",
177 self.label.read(cx).source(),
178 self.status
179 );
180 if let Some(content) = &self.content {
181 markdown.push_str(content.to_markdown(cx).as_str());
182 markdown.push_str("\n\n");
183 }
184 markdown
185 }
186}
187
188#[derive(Debug)]
189pub enum ToolCallStatus {
190 WaitingForConfirmation {
191 confirmation: ToolCallConfirmation,
192 respond_tx: oneshot::Sender<acp::ToolCallConfirmationOutcome>,
193 },
194 Allowed {
195 status: acp::ToolCallStatus,
196 },
197 Rejected,
198 Canceled,
199}
200
201impl Display for ToolCallStatus {
202 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203 write!(
204 f,
205 "{}",
206 match self {
207 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
208 ToolCallStatus::Allowed { status } => match status {
209 acp::ToolCallStatus::Running => "Running",
210 acp::ToolCallStatus::Finished => "Finished",
211 acp::ToolCallStatus::Error => "Error",
212 },
213 ToolCallStatus::Rejected => "Rejected",
214 ToolCallStatus::Canceled => "Canceled",
215 }
216 )
217 }
218}
219
220#[derive(Debug)]
221pub enum ToolCallConfirmation {
222 Edit {
223 description: Option<Entity<Markdown>>,
224 },
225 Execute {
226 command: String,
227 root_command: String,
228 description: Option<Entity<Markdown>>,
229 },
230 Mcp {
231 server_name: String,
232 tool_name: String,
233 tool_display_name: String,
234 description: Option<Entity<Markdown>>,
235 },
236 Fetch {
237 urls: Vec<SharedString>,
238 description: Option<Entity<Markdown>>,
239 },
240 Other {
241 description: Entity<Markdown>,
242 },
243}
244
245impl ToolCallConfirmation {
246 pub fn from_acp(
247 confirmation: acp::ToolCallConfirmation,
248 language_registry: Arc<LanguageRegistry>,
249 cx: &mut App,
250 ) -> Self {
251 let to_md = |description: String, cx: &mut App| -> Entity<Markdown> {
252 cx.new(|cx| {
253 Markdown::new(
254 description.into(),
255 Some(language_registry.clone()),
256 None,
257 cx,
258 )
259 })
260 };
261
262 match confirmation {
263 acp::ToolCallConfirmation::Edit { description } => Self::Edit {
264 description: description.map(|description| to_md(description, cx)),
265 },
266 acp::ToolCallConfirmation::Execute {
267 command,
268 root_command,
269 description,
270 } => Self::Execute {
271 command,
272 root_command,
273 description: description.map(|description| to_md(description, cx)),
274 },
275 acp::ToolCallConfirmation::Mcp {
276 server_name,
277 tool_name,
278 tool_display_name,
279 description,
280 } => Self::Mcp {
281 server_name,
282 tool_name,
283 tool_display_name,
284 description: description.map(|description| to_md(description, cx)),
285 },
286 acp::ToolCallConfirmation::Fetch { urls, description } => Self::Fetch {
287 urls: urls.iter().map(|url| url.into()).collect(),
288 description: description.map(|description| to_md(description, cx)),
289 },
290 acp::ToolCallConfirmation::Other { description } => Self::Other {
291 description: to_md(description, cx),
292 },
293 }
294 }
295}
296
297#[derive(Debug)]
298pub enum ToolCallContent {
299 Markdown { markdown: Entity<Markdown> },
300 Diff { diff: Diff },
301}
302
303impl ToolCallContent {
304 pub fn from_acp(
305 content: acp::ToolCallContent,
306 language_registry: Arc<LanguageRegistry>,
307 cx: &mut App,
308 ) -> Self {
309 match content {
310 acp::ToolCallContent::Markdown { markdown } => Self::Markdown {
311 markdown: cx.new(|cx| Markdown::new_text(markdown.into(), cx)),
312 },
313 acp::ToolCallContent::Diff { diff } => Self::Diff {
314 diff: Diff::from_acp(diff, language_registry, cx),
315 },
316 }
317 }
318
319 fn to_markdown(&self, cx: &App) -> String {
320 match self {
321 Self::Markdown { markdown } => markdown.read(cx).source().to_string(),
322 Self::Diff { diff } => diff.to_markdown(cx),
323 }
324 }
325}
326
327#[derive(Debug)]
328pub struct Diff {
329 pub multibuffer: Entity<MultiBuffer>,
330 pub path: PathBuf,
331 _task: Task<Result<()>>,
332}
333
334impl Diff {
335 pub fn from_acp(
336 diff: acp::Diff,
337 language_registry: Arc<LanguageRegistry>,
338 cx: &mut App,
339 ) -> Self {
340 let acp::Diff {
341 path,
342 old_text,
343 new_text,
344 } = diff;
345
346 let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
347
348 let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
349 let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
350 let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
351 let old_buffer_snapshot = old_buffer.read(cx).snapshot();
352 let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
353 let diff_task = buffer_diff.update(cx, |diff, cx| {
354 diff.set_base_text(
355 old_buffer_snapshot,
356 Some(language_registry.clone()),
357 new_buffer_snapshot,
358 cx,
359 )
360 });
361
362 let task = cx.spawn({
363 let multibuffer = multibuffer.clone();
364 let path = path.clone();
365 async move |cx| {
366 diff_task.await?;
367
368 multibuffer
369 .update(cx, |multibuffer, cx| {
370 let hunk_ranges = {
371 let buffer = new_buffer.read(cx);
372 let diff = buffer_diff.read(cx);
373 diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
374 .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
375 .collect::<Vec<_>>()
376 };
377
378 multibuffer.set_excerpts_for_path(
379 PathKey::for_buffer(&new_buffer, cx),
380 new_buffer.clone(),
381 hunk_ranges,
382 editor::DEFAULT_MULTIBUFFER_CONTEXT,
383 cx,
384 );
385 multibuffer.add_diff(buffer_diff.clone(), cx);
386 })
387 .log_err();
388
389 if let Some(language) = language_registry
390 .language_for_file_path(&path)
391 .await
392 .log_err()
393 {
394 new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
395 }
396
397 anyhow::Ok(())
398 }
399 });
400
401 Self {
402 multibuffer,
403 path,
404 _task: task,
405 }
406 }
407
408 fn to_markdown(&self, cx: &App) -> String {
409 let buffer_text = self
410 .multibuffer
411 .read(cx)
412 .all_buffers()
413 .iter()
414 .map(|buffer| buffer.read(cx).text())
415 .join("\n");
416 format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
417 }
418}
419
420pub struct AcpThread {
421 entries: Vec<AgentThreadEntry>,
422 title: SharedString,
423 project: Entity<Project>,
424 send_task: Option<Task<()>>,
425 connection: Arc<acp::AgentConnection>,
426 child_status: Option<Task<Result<()>>>,
427 _io_task: Task<()>,
428}
429
430pub enum AcpThreadEvent {
431 NewEntry,
432 EntryUpdated(usize),
433}
434
435impl EventEmitter<AcpThreadEvent> for AcpThread {}
436
437#[derive(PartialEq, Eq)]
438pub enum ThreadStatus {
439 Idle,
440 WaitingForToolConfirmation,
441 Generating,
442}
443
444#[derive(Debug, Clone)]
445pub enum LoadError {
446 Unsupported { current_version: SharedString },
447 Exited(i32),
448 Other(SharedString),
449}
450
451impl Display for LoadError {
452 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
453 match self {
454 LoadError::Unsupported { current_version } => {
455 write!(
456 f,
457 "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).",
458 current_version
459 )
460 }
461 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
462 LoadError::Other(msg) => write!(f, "{}", msg),
463 }
464 }
465}
466
467impl Error for LoadError {}
468
469impl AcpThread {
470 pub async fn spawn(
471 server: impl AgentServer + 'static,
472 root_dir: &Path,
473 project: Entity<Project>,
474 cx: &mut AsyncApp,
475 ) -> Result<Entity<Self>> {
476 let command = match server.command(&project, cx).await {
477 Ok(command) => command,
478 Err(e) => return Err(anyhow!(LoadError::Other(format!("{e}").into()))),
479 };
480
481 let mut child = util::command::new_smol_command(&command.path)
482 .args(command.args.iter())
483 .current_dir(root_dir)
484 .stdin(std::process::Stdio::piped())
485 .stdout(std::process::Stdio::piped())
486 .stderr(std::process::Stdio::inherit())
487 .kill_on_drop(true)
488 .spawn()?;
489
490 let stdin = child.stdin.take().unwrap();
491 let stdout = child.stdout.take().unwrap();
492
493 cx.new(|cx| {
494 let foreground_executor = cx.foreground_executor().clone();
495
496 let (connection, io_fut) = acp::AgentConnection::connect_to_agent(
497 AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
498 stdin,
499 stdout,
500 move |fut| foreground_executor.spawn(fut).detach(),
501 );
502
503 let io_task = cx.background_spawn(async move {
504 io_fut.await.log_err();
505 });
506
507 let child_status = cx.background_spawn(async move {
508 match child.status().await {
509 Err(e) => Err(anyhow!(e)),
510 Ok(result) if result.success() => Ok(()),
511 Ok(result) => {
512 if let Some(version) = server.version(&command).await.log_err()
513 && !version.supported
514 {
515 Err(anyhow!(LoadError::Unsupported {
516 current_version: version.current_version
517 }))
518 } else {
519 Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127))))
520 }
521 }
522 }
523 });
524
525 Self {
526 entries: Default::default(),
527 title: "ACP Thread".into(),
528 project,
529 send_task: None,
530 connection: Arc::new(connection),
531 child_status: Some(child_status),
532 _io_task: io_task,
533 }
534 })
535 }
536
537 #[cfg(test)]
538 pub fn fake(
539 stdin: async_pipe::PipeWriter,
540 stdout: async_pipe::PipeReader,
541 project: Entity<Project>,
542 cx: &mut Context<Self>,
543 ) -> Self {
544 let foreground_executor = cx.foreground_executor().clone();
545
546 let (connection, io_fut) = acp::AgentConnection::connect_to_agent(
547 AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
548 stdin,
549 stdout,
550 move |fut| {
551 foreground_executor.spawn(fut).detach();
552 },
553 );
554
555 let io_task = cx.background_spawn({
556 async move {
557 io_fut.await.log_err();
558 }
559 });
560
561 Self {
562 entries: Default::default(),
563 title: "ACP Thread".into(),
564 project,
565 send_task: None,
566 connection: Arc::new(connection),
567 child_status: None,
568 _io_task: io_task,
569 }
570 }
571
572 pub fn title(&self) -> SharedString {
573 self.title.clone()
574 }
575
576 pub fn entries(&self) -> &[AgentThreadEntry] {
577 &self.entries
578 }
579
580 pub fn status(&self) -> ThreadStatus {
581 if self.send_task.is_some() {
582 if self.waiting_for_tool_confirmation() {
583 ThreadStatus::WaitingForToolConfirmation
584 } else {
585 ThreadStatus::Generating
586 }
587 } else {
588 ThreadStatus::Idle
589 }
590 }
591
592 pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
593 self.entries.push(entry);
594 cx.emit(AcpThreadEvent::NewEntry);
595 }
596
597 pub fn push_assistant_chunk(
598 &mut self,
599 chunk: acp::AssistantMessageChunk,
600 cx: &mut Context<Self>,
601 ) {
602 let entries_len = self.entries.len();
603 if let Some(last_entry) = self.entries.last_mut()
604 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
605 {
606 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
607
608 match (chunks.last_mut(), &chunk) {
609 (
610 Some(AssistantMessageChunk::Text { chunk: old_chunk }),
611 acp::AssistantMessageChunk::Text { text: new_chunk },
612 )
613 | (
614 Some(AssistantMessageChunk::Thought { chunk: old_chunk }),
615 acp::AssistantMessageChunk::Thought { thought: new_chunk },
616 ) => {
617 old_chunk.update(cx, |old_chunk, cx| {
618 old_chunk.append(&new_chunk, cx);
619 });
620 }
621 _ => {
622 chunks.push(AssistantMessageChunk::from_acp(
623 chunk,
624 self.project.read(cx).languages().clone(),
625 cx,
626 ));
627 }
628 }
629 } else {
630 let chunk = AssistantMessageChunk::from_acp(
631 chunk,
632 self.project.read(cx).languages().clone(),
633 cx,
634 );
635
636 self.push_entry(
637 AgentThreadEntry::AssistantMessage(AssistantMessage {
638 chunks: vec![chunk],
639 }),
640 cx,
641 );
642 }
643 }
644
645 pub fn request_tool_call(
646 &mut self,
647 label: String,
648 icon: acp::Icon,
649 content: Option<acp::ToolCallContent>,
650 confirmation: acp::ToolCallConfirmation,
651 cx: &mut Context<Self>,
652 ) -> ToolCallRequest {
653 let (tx, rx) = oneshot::channel();
654
655 let status = ToolCallStatus::WaitingForConfirmation {
656 confirmation: ToolCallConfirmation::from_acp(
657 confirmation,
658 self.project.read(cx).languages().clone(),
659 cx,
660 ),
661 respond_tx: tx,
662 };
663
664 let id = self.insert_tool_call(label, status, icon, content, cx);
665 ToolCallRequest { id, outcome: rx }
666 }
667
668 pub fn push_tool_call(
669 &mut self,
670 label: String,
671 icon: acp::Icon,
672 content: Option<acp::ToolCallContent>,
673 cx: &mut Context<Self>,
674 ) -> acp::ToolCallId {
675 let status = ToolCallStatus::Allowed {
676 status: acp::ToolCallStatus::Running,
677 };
678
679 self.insert_tool_call(label, status, icon, content, cx)
680 }
681
682 fn insert_tool_call(
683 &mut self,
684 label: String,
685 status: ToolCallStatus,
686 icon: acp::Icon,
687 content: Option<acp::ToolCallContent>,
688 cx: &mut Context<Self>,
689 ) -> acp::ToolCallId {
690 let language_registry = self.project.read(cx).languages().clone();
691 let id = acp::ToolCallId(self.entries.len() as u64);
692
693 self.push_entry(
694 AgentThreadEntry::ToolCall(ToolCall {
695 id,
696 label: cx.new(|cx| {
697 Markdown::new(label.into(), Some(language_registry.clone()), None, cx)
698 }),
699 icon: acp_icon_to_ui_icon(icon),
700 content: content
701 .map(|content| ToolCallContent::from_acp(content, language_registry, cx)),
702 status,
703 }),
704 cx,
705 );
706
707 id
708 }
709
710 pub fn authorize_tool_call(
711 &mut self,
712 id: acp::ToolCallId,
713 outcome: acp::ToolCallConfirmationOutcome,
714 cx: &mut Context<Self>,
715 ) {
716 let Some((ix, call)) = self.tool_call_mut(id) else {
717 return;
718 };
719
720 let new_status = if outcome == acp::ToolCallConfirmationOutcome::Reject {
721 ToolCallStatus::Rejected
722 } else {
723 ToolCallStatus::Allowed {
724 status: acp::ToolCallStatus::Running,
725 }
726 };
727
728 let curr_status = mem::replace(&mut call.status, new_status);
729
730 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
731 respond_tx.send(outcome).log_err();
732 } else if cfg!(debug_assertions) {
733 panic!("tried to authorize an already authorized tool call");
734 }
735
736 cx.emit(AcpThreadEvent::EntryUpdated(ix));
737 }
738
739 pub fn update_tool_call(
740 &mut self,
741 id: acp::ToolCallId,
742 new_status: acp::ToolCallStatus,
743 new_content: Option<acp::ToolCallContent>,
744 cx: &mut Context<Self>,
745 ) -> Result<()> {
746 let language_registry = self.project.read(cx).languages().clone();
747 let (ix, call) = self.tool_call_mut(id).context("Entry not found")?;
748
749 call.content = new_content
750 .map(|new_content| ToolCallContent::from_acp(new_content, language_registry, cx));
751
752 match &mut call.status {
753 ToolCallStatus::Allowed { status } => {
754 *status = new_status;
755 }
756 ToolCallStatus::WaitingForConfirmation { .. } => {
757 anyhow::bail!("Tool call hasn't been authorized yet")
758 }
759 ToolCallStatus::Rejected => {
760 anyhow::bail!("Tool call was rejected and therefore can't be updated")
761 }
762 ToolCallStatus::Canceled => {
763 call.status = ToolCallStatus::Allowed { status: new_status };
764 }
765 }
766
767 cx.emit(AcpThreadEvent::EntryUpdated(ix));
768 Ok(())
769 }
770
771 fn tool_call_mut(&mut self, id: acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
772 let entry = self.entries.get_mut(id.0 as usize);
773 debug_assert!(
774 entry.is_some(),
775 "We shouldn't give out ids to entries that don't exist"
776 );
777 match entry {
778 Some(AgentThreadEntry::ToolCall(call)) if call.id == id => Some((id.0 as usize, call)),
779 _ => {
780 if cfg!(debug_assertions) {
781 panic!("entry is not a tool call");
782 }
783 None
784 }
785 }
786 }
787
788 /// Returns true if the last turn is awaiting tool authorization
789 pub fn waiting_for_tool_confirmation(&self) -> bool {
790 for entry in self.entries.iter().rev() {
791 match &entry {
792 AgentThreadEntry::ToolCall(call) => match call.status {
793 ToolCallStatus::WaitingForConfirmation { .. } => return true,
794 ToolCallStatus::Allowed { .. }
795 | ToolCallStatus::Rejected
796 | ToolCallStatus::Canceled => continue,
797 },
798 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
799 // Reached the beginning of the turn
800 return false;
801 }
802 }
803 }
804 false
805 }
806
807 pub fn initialize(&self) -> impl use<> + Future<Output = Result<acp::InitializeResponse>> {
808 let connection = self.connection.clone();
809 async move { Ok(connection.request(acp::InitializeParams).await?) }
810 }
811
812 pub fn authenticate(&self) -> impl use<> + Future<Output = Result<()>> {
813 let connection = self.connection.clone();
814 async move { Ok(connection.request(acp::AuthenticateParams).await?) }
815 }
816
817 #[cfg(test)]
818 pub fn send_raw(
819 &mut self,
820 message: &str,
821 cx: &mut Context<Self>,
822 ) -> BoxFuture<'static, Result<()>> {
823 self.send(
824 acp::SendUserMessageParams {
825 chunks: vec![acp::UserMessageChunk::Text {
826 text: message.to_string(),
827 }],
828 },
829 cx,
830 )
831 }
832
833 pub fn send(
834 &mut self,
835 message: acp::SendUserMessageParams,
836 cx: &mut Context<Self>,
837 ) -> BoxFuture<'static, Result<()>> {
838 let agent = self.connection.clone();
839 self.push_entry(
840 AgentThreadEntry::UserMessage(UserMessage::from_acp(
841 &message,
842 self.project.read(cx).languages().clone(),
843 cx,
844 )),
845 cx,
846 );
847
848 let (tx, rx) = oneshot::channel();
849 let cancel = self.cancel(cx);
850
851 self.send_task = Some(cx.spawn(async move |this, cx| {
852 cancel.await.log_err();
853
854 let result = agent.request(message).await;
855 tx.send(result).log_err();
856 this.update(cx, |this, _cx| this.send_task.take()).log_err();
857 }));
858
859 async move {
860 match rx.await {
861 Ok(Err(e)) => Err(e)?,
862 _ => Ok(()),
863 }
864 }
865 .boxed()
866 }
867
868 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
869 let agent = self.connection.clone();
870
871 if self.send_task.take().is_some() {
872 cx.spawn(async move |this, cx| {
873 agent.request(acp::CancelSendMessageParams).await?;
874
875 this.update(cx, |this, _cx| {
876 for entry in this.entries.iter_mut() {
877 if let AgentThreadEntry::ToolCall(call) = entry {
878 let cancel = matches!(
879 call.status,
880 ToolCallStatus::WaitingForConfirmation { .. }
881 | ToolCallStatus::Allowed {
882 status: acp::ToolCallStatus::Running
883 }
884 );
885
886 if cancel {
887 let curr_status =
888 mem::replace(&mut call.status, ToolCallStatus::Canceled);
889
890 if let ToolCallStatus::WaitingForConfirmation {
891 respond_tx, ..
892 } = curr_status
893 {
894 respond_tx
895 .send(acp::ToolCallConfirmationOutcome::Cancel)
896 .ok();
897 }
898 }
899 }
900 }
901 })
902 })
903 } else {
904 Task::ready(Ok(()))
905 }
906 }
907
908 pub fn child_status(&mut self) -> Option<Task<Result<()>>> {
909 self.child_status.take()
910 }
911
912 pub fn to_markdown(&self, cx: &App) -> String {
913 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
914 }
915}
916
917struct AcpClientDelegate {
918 thread: WeakEntity<AcpThread>,
919 cx: AsyncApp,
920 // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
921}
922
923impl AcpClientDelegate {
924 fn new(thread: WeakEntity<AcpThread>, cx: AsyncApp) -> Self {
925 Self { thread, cx }
926 }
927}
928
929impl acp::Client for AcpClientDelegate {
930 async fn stream_assistant_message_chunk(
931 &self,
932 params: acp::StreamAssistantMessageChunkParams,
933 ) -> Result<()> {
934 let cx = &mut self.cx.clone();
935
936 cx.update(|cx| {
937 self.thread
938 .update(cx, |thread, cx| {
939 thread.push_assistant_chunk(params.chunk, cx)
940 })
941 .ok();
942 })?;
943
944 Ok(())
945 }
946
947 async fn request_tool_call_confirmation(
948 &self,
949 request: acp::RequestToolCallConfirmationParams,
950 ) -> Result<acp::RequestToolCallConfirmationResponse> {
951 let cx = &mut self.cx.clone();
952 let ToolCallRequest { id, outcome } = cx
953 .update(|cx| {
954 self.thread.update(cx, |thread, cx| {
955 thread.request_tool_call(
956 request.label,
957 request.icon,
958 request.content,
959 request.confirmation,
960 cx,
961 )
962 })
963 })?
964 .context("Failed to update thread")?;
965
966 Ok(acp::RequestToolCallConfirmationResponse {
967 id,
968 outcome: outcome.await?,
969 })
970 }
971
972 async fn push_tool_call(
973 &self,
974 request: acp::PushToolCallParams,
975 ) -> Result<acp::PushToolCallResponse> {
976 let cx = &mut self.cx.clone();
977 let id = cx
978 .update(|cx| {
979 self.thread.update(cx, |thread, cx| {
980 thread.push_tool_call(request.label, request.icon, request.content, cx)
981 })
982 })?
983 .context("Failed to update thread")?;
984
985 Ok(acp::PushToolCallResponse { id })
986 }
987
988 async fn update_tool_call(&self, request: acp::UpdateToolCallParams) -> Result<()> {
989 let cx = &mut self.cx.clone();
990
991 cx.update(|cx| {
992 self.thread.update(cx, |thread, cx| {
993 thread.update_tool_call(request.tool_call_id, request.status, request.content, cx)
994 })
995 })?
996 .context("Failed to update thread")??;
997
998 Ok(())
999 }
1000}
1001
1002fn acp_icon_to_ui_icon(icon: acp::Icon) -> IconName {
1003 match icon {
1004 acp::Icon::FileSearch => IconName::ToolSearch,
1005 acp::Icon::Folder => IconName::ToolFolder,
1006 acp::Icon::Globe => IconName::ToolWeb,
1007 acp::Icon::Hammer => IconName::ToolHammer,
1008 acp::Icon::LightBulb => IconName::ToolBulb,
1009 acp::Icon::Pencil => IconName::ToolPencil,
1010 acp::Icon::Regex => IconName::ToolRegex,
1011 acp::Icon::Terminal => IconName::ToolTerminal,
1012 }
1013}
1014
1015pub struct ToolCallRequest {
1016 pub id: acp::ToolCallId,
1017 pub outcome: oneshot::Receiver<acp::ToolCallConfirmationOutcome>,
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use super::*;
1023 use agent_servers::{AgentServerCommand, AgentServerVersion};
1024 use async_pipe::{PipeReader, PipeWriter};
1025 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1026 use gpui::{AsyncApp, TestAppContext};
1027 use indoc::indoc;
1028 use project::FakeFs;
1029 use serde_json::json;
1030 use settings::SettingsStore;
1031 use smol::{future::BoxedLocal, stream::StreamExt as _};
1032 use std::{cell::RefCell, env, path::Path, rc::Rc, time::Duration};
1033 use util::path;
1034
1035 fn init_test(cx: &mut TestAppContext) {
1036 env_logger::try_init().ok();
1037 cx.update(|cx| {
1038 let settings_store = SettingsStore::test(cx);
1039 cx.set_global(settings_store);
1040 Project::init_settings(cx);
1041 language::init(cx);
1042 });
1043 }
1044
1045 #[gpui::test]
1046 async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1047 init_test(cx);
1048
1049 let fs = FakeFs::new(cx.executor());
1050 let project = Project::test(fs, [], cx).await;
1051 let (thread, fake_server) = fake_acp_thread(project, cx);
1052
1053 fake_server.update(cx, |fake_server, _| {
1054 fake_server.on_user_message(move |_, server, mut cx| async move {
1055 server
1056 .update(&mut cx, |server, _| {
1057 server.send_to_zed(acp::StreamAssistantMessageChunkParams {
1058 chunk: acp::AssistantMessageChunk::Thought {
1059 thought: "Thinking ".into(),
1060 },
1061 })
1062 })?
1063 .await
1064 .unwrap();
1065 server
1066 .update(&mut cx, |server, _| {
1067 server.send_to_zed(acp::StreamAssistantMessageChunkParams {
1068 chunk: acp::AssistantMessageChunk::Thought {
1069 thought: "hard!".into(),
1070 },
1071 })
1072 })?
1073 .await
1074 .unwrap();
1075
1076 Ok(())
1077 })
1078 });
1079
1080 thread
1081 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1082 .await
1083 .unwrap();
1084
1085 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1086 assert_eq!(
1087 output,
1088 indoc! {r#"
1089 ## User
1090
1091 Hello from Zed!
1092
1093 ## Assistant
1094
1095 <thinking>
1096 Thinking hard!
1097 </thinking>
1098
1099 "#}
1100 );
1101 }
1102
1103 #[gpui::test]
1104 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1105 init_test(cx);
1106
1107 let fs = FakeFs::new(cx.executor());
1108 let project = Project::test(fs, [], cx).await;
1109 let (thread, fake_server) = fake_acp_thread(project, cx);
1110
1111 let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1112
1113 let tool_call_id = Rc::new(RefCell::new(None));
1114 let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1115 fake_server.update(cx, |fake_server, _| {
1116 let tool_call_id = tool_call_id.clone();
1117 fake_server.on_user_message(move |_, server, mut cx| {
1118 let end_turn_rx = end_turn_rx.clone();
1119 let tool_call_id = tool_call_id.clone();
1120 async move {
1121 let tool_call_result = server
1122 .update(&mut cx, |server, _| {
1123 server.send_to_zed(acp::PushToolCallParams {
1124 label: "Fetch".to_string(),
1125 icon: acp::Icon::Globe,
1126 content: None,
1127 })
1128 })?
1129 .await
1130 .unwrap();
1131 *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1132 end_turn_rx.take().unwrap().await.ok();
1133
1134 Ok(())
1135 }
1136 })
1137 });
1138
1139 let request = thread.update(cx, |thread, cx| {
1140 thread.send_raw("Fetch https://example.com", cx)
1141 });
1142
1143 run_until_first_tool_call(&thread, cx).await;
1144
1145 thread.read_with(cx, |thread, _| {
1146 assert!(matches!(
1147 thread.entries[1],
1148 AgentThreadEntry::ToolCall(ToolCall {
1149 status: ToolCallStatus::Allowed {
1150 status: acp::ToolCallStatus::Running,
1151 ..
1152 },
1153 ..
1154 })
1155 ));
1156 });
1157
1158 cx.run_until_parked();
1159
1160 thread
1161 .update(cx, |thread, cx| thread.cancel(cx))
1162 .await
1163 .unwrap();
1164
1165 thread.read_with(cx, |thread, _| {
1166 assert!(matches!(
1167 &thread.entries[1],
1168 AgentThreadEntry::ToolCall(ToolCall {
1169 status: ToolCallStatus::Canceled,
1170 ..
1171 })
1172 ));
1173 });
1174
1175 fake_server
1176 .update(cx, |fake_server, _| {
1177 fake_server.send_to_zed(acp::UpdateToolCallParams {
1178 tool_call_id: tool_call_id.borrow().unwrap(),
1179 status: acp::ToolCallStatus::Finished,
1180 content: None,
1181 })
1182 })
1183 .await
1184 .unwrap();
1185
1186 drop(end_turn_tx);
1187 request.await.unwrap();
1188
1189 thread.read_with(cx, |thread, _| {
1190 assert!(matches!(
1191 thread.entries[1],
1192 AgentThreadEntry::ToolCall(ToolCall {
1193 status: ToolCallStatus::Allowed {
1194 status: acp::ToolCallStatus::Finished,
1195 ..
1196 },
1197 ..
1198 })
1199 ));
1200 });
1201 }
1202
1203 #[gpui::test]
1204 #[cfg_attr(not(feature = "gemini"), ignore)]
1205 async fn test_gemini_basic(cx: &mut TestAppContext) {
1206 init_test(cx);
1207
1208 cx.executor().allow_parking();
1209
1210 let fs = FakeFs::new(cx.executor());
1211 let project = Project::test(fs, [], cx).await;
1212 let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1213 thread
1214 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1215 .await
1216 .unwrap();
1217
1218 thread.read_with(cx, |thread, _| {
1219 assert_eq!(thread.entries.len(), 2);
1220 assert!(matches!(
1221 thread.entries[0],
1222 AgentThreadEntry::UserMessage(_)
1223 ));
1224 assert!(matches!(
1225 thread.entries[1],
1226 AgentThreadEntry::AssistantMessage(_)
1227 ));
1228 });
1229 }
1230
1231 #[gpui::test]
1232 #[cfg_attr(not(feature = "gemini"), ignore)]
1233 async fn test_gemini_path_mentions(cx: &mut TestAppContext) {
1234 init_test(cx);
1235
1236 cx.executor().allow_parking();
1237 let tempdir = tempfile::tempdir().unwrap();
1238 std::fs::write(
1239 tempdir.path().join("foo.rs"),
1240 indoc! {"
1241 fn main() {
1242 println!(\"Hello, world!\");
1243 }
1244 "},
1245 )
1246 .expect("failed to write file");
1247 let project = Project::example([tempdir.path()], &mut cx.to_async()).await;
1248 let thread = gemini_acp_thread(project.clone(), tempdir.path(), cx).await;
1249 thread
1250 .update(cx, |thread, cx| {
1251 thread.send(
1252 acp::SendUserMessageParams {
1253 chunks: vec![
1254 acp::UserMessageChunk::Text {
1255 text: "Read the file ".into(),
1256 },
1257 acp::UserMessageChunk::Path {
1258 path: Path::new("foo.rs").into(),
1259 },
1260 acp::UserMessageChunk::Text {
1261 text: " and tell me what the content of the println! is".into(),
1262 },
1263 ],
1264 },
1265 cx,
1266 )
1267 })
1268 .await
1269 .unwrap();
1270
1271 thread.read_with(cx, |thread, cx| {
1272 assert_eq!(thread.entries.len(), 3);
1273 assert!(matches!(
1274 thread.entries[0],
1275 AgentThreadEntry::UserMessage(_)
1276 ));
1277 assert!(matches!(thread.entries[1], AgentThreadEntry::ToolCall(_)));
1278 let AgentThreadEntry::AssistantMessage(assistant_message) = &thread.entries[2] else {
1279 panic!("Expected AssistantMessage")
1280 };
1281 assert!(
1282 assistant_message.to_markdown(cx).contains("Hello, world!"),
1283 "unexpected assistant message: {:?}",
1284 assistant_message.to_markdown(cx)
1285 );
1286 });
1287 }
1288
1289 #[gpui::test]
1290 #[cfg_attr(not(feature = "gemini"), ignore)]
1291 async fn test_gemini_tool_call(cx: &mut TestAppContext) {
1292 init_test(cx);
1293
1294 cx.executor().allow_parking();
1295
1296 let fs = FakeFs::new(cx.executor());
1297 fs.insert_tree(
1298 path!("/private/tmp"),
1299 json!({"foo": "Lorem ipsum dolor", "bar": "bar", "baz": "baz"}),
1300 )
1301 .await;
1302 let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1303 let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1304 thread
1305 .update(cx, |thread, cx| {
1306 thread.send_raw(
1307 "Read the '/private/tmp/foo' file and tell me what you see.",
1308 cx,
1309 )
1310 })
1311 .await
1312 .unwrap();
1313 thread.read_with(cx, |thread, _cx| {
1314 assert!(matches!(
1315 &thread.entries()[2],
1316 AgentThreadEntry::ToolCall(ToolCall {
1317 status: ToolCallStatus::Allowed { .. },
1318 ..
1319 })
1320 ));
1321
1322 assert!(matches!(
1323 thread.entries[3],
1324 AgentThreadEntry::AssistantMessage(_)
1325 ));
1326 });
1327 }
1328
1329 #[gpui::test]
1330 #[cfg_attr(not(feature = "gemini"), ignore)]
1331 async fn test_gemini_tool_call_with_confirmation(cx: &mut TestAppContext) {
1332 init_test(cx);
1333
1334 cx.executor().allow_parking();
1335
1336 let fs = FakeFs::new(cx.executor());
1337 let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1338 let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1339 let full_turn = thread.update(cx, |thread, cx| {
1340 thread.send_raw(r#"Run `echo "Hello, world!"`"#, cx)
1341 });
1342
1343 run_until_first_tool_call(&thread, cx).await;
1344
1345 let tool_call_id = thread.read_with(cx, |thread, _cx| {
1346 let AgentThreadEntry::ToolCall(ToolCall {
1347 id,
1348 status:
1349 ToolCallStatus::WaitingForConfirmation {
1350 confirmation: ToolCallConfirmation::Execute { root_command, .. },
1351 ..
1352 },
1353 ..
1354 }) = &thread.entries()[2]
1355 else {
1356 panic!();
1357 };
1358
1359 assert_eq!(root_command, "echo");
1360
1361 *id
1362 });
1363
1364 thread.update(cx, |thread, cx| {
1365 thread.authorize_tool_call(tool_call_id, acp::ToolCallConfirmationOutcome::Allow, cx);
1366
1367 assert!(matches!(
1368 &thread.entries()[2],
1369 AgentThreadEntry::ToolCall(ToolCall {
1370 status: ToolCallStatus::Allowed { .. },
1371 ..
1372 })
1373 ));
1374 });
1375
1376 full_turn.await.unwrap();
1377
1378 thread.read_with(cx, |thread, cx| {
1379 let AgentThreadEntry::ToolCall(ToolCall {
1380 content: Some(ToolCallContent::Markdown { markdown }),
1381 status: ToolCallStatus::Allowed { .. },
1382 ..
1383 }) = &thread.entries()[2]
1384 else {
1385 panic!();
1386 };
1387
1388 markdown.read_with(cx, |md, _cx| {
1389 assert!(
1390 md.source().contains("Hello, world!"),
1391 r#"Expected '{}' to contain "Hello, world!""#,
1392 md.source()
1393 );
1394 });
1395 });
1396 }
1397
1398 #[gpui::test]
1399 #[cfg_attr(not(feature = "gemini"), ignore)]
1400 async fn test_gemini_cancel(cx: &mut TestAppContext) {
1401 init_test(cx);
1402
1403 cx.executor().allow_parking();
1404
1405 let fs = FakeFs::new(cx.executor());
1406 let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1407 let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1408 let full_turn = thread.update(cx, |thread, cx| {
1409 thread.send_raw(r#"Run `echo "Hello, world!"`"#, cx)
1410 });
1411
1412 let first_tool_call_ix = run_until_first_tool_call(&thread, cx).await;
1413
1414 thread.read_with(cx, |thread, _cx| {
1415 let AgentThreadEntry::ToolCall(ToolCall {
1416 id,
1417 status:
1418 ToolCallStatus::WaitingForConfirmation {
1419 confirmation: ToolCallConfirmation::Execute { root_command, .. },
1420 ..
1421 },
1422 ..
1423 }) = &thread.entries()[first_tool_call_ix]
1424 else {
1425 panic!("{:?}", thread.entries()[1]);
1426 };
1427
1428 assert_eq!(root_command, "echo");
1429
1430 *id
1431 });
1432
1433 thread
1434 .update(cx, |thread, cx| thread.cancel(cx))
1435 .await
1436 .unwrap();
1437 full_turn.await.unwrap();
1438 thread.read_with(cx, |thread, _| {
1439 let AgentThreadEntry::ToolCall(ToolCall {
1440 status: ToolCallStatus::Canceled,
1441 ..
1442 }) = &thread.entries()[first_tool_call_ix]
1443 else {
1444 panic!();
1445 };
1446 });
1447
1448 thread
1449 .update(cx, |thread, cx| {
1450 thread.send_raw(r#"Stop running and say goodbye to me."#, cx)
1451 })
1452 .await
1453 .unwrap();
1454 thread.read_with(cx, |thread, _| {
1455 assert!(matches!(
1456 &thread.entries().last().unwrap(),
1457 AgentThreadEntry::AssistantMessage(..),
1458 ))
1459 });
1460 }
1461
1462 async fn run_until_first_tool_call(
1463 thread: &Entity<AcpThread>,
1464 cx: &mut TestAppContext,
1465 ) -> usize {
1466 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1467
1468 let subscription = cx.update(|cx| {
1469 cx.subscribe(thread, move |thread, _, cx| {
1470 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1471 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1472 return tx.try_send(ix).unwrap();
1473 }
1474 }
1475 })
1476 });
1477
1478 select! {
1479 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1480 panic!("Timeout waiting for tool call")
1481 }
1482 ix = rx.next().fuse() => {
1483 drop(subscription);
1484 ix.unwrap()
1485 }
1486 }
1487 }
1488
1489 pub async fn gemini_acp_thread(
1490 project: Entity<Project>,
1491 current_dir: impl AsRef<Path>,
1492 cx: &mut TestAppContext,
1493 ) -> Entity<AcpThread> {
1494 struct DevGemini;
1495
1496 impl agent_servers::AgentServer for DevGemini {
1497 async fn command(
1498 &self,
1499 _project: &Entity<Project>,
1500 _cx: &mut AsyncApp,
1501 ) -> Result<agent_servers::AgentServerCommand> {
1502 let cli_path = Path::new(env!("CARGO_MANIFEST_DIR"))
1503 .join("../../../gemini-cli/packages/cli")
1504 .to_string_lossy()
1505 .to_string();
1506
1507 Ok(AgentServerCommand {
1508 path: "node".into(),
1509 args: vec![cli_path, "--acp".into()],
1510 env: None,
1511 })
1512 }
1513
1514 async fn version(
1515 &self,
1516 _command: &agent_servers::AgentServerCommand,
1517 ) -> Result<AgentServerVersion> {
1518 Ok(AgentServerVersion {
1519 current_version: "0.1.0".into(),
1520 supported: true,
1521 })
1522 }
1523 }
1524
1525 let thread = AcpThread::spawn(DevGemini, current_dir.as_ref(), project, &mut cx.to_async())
1526 .await
1527 .unwrap();
1528
1529 thread
1530 .update(cx, |thread, _| thread.initialize())
1531 .await
1532 .unwrap();
1533 thread
1534 }
1535
1536 pub fn fake_acp_thread(
1537 project: Entity<Project>,
1538 cx: &mut TestAppContext,
1539 ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1540 let (stdin_tx, stdin_rx) = async_pipe::pipe();
1541 let (stdout_tx, stdout_rx) = async_pipe::pipe();
1542 let thread = cx.update(|cx| cx.new(|cx| AcpThread::fake(stdin_tx, stdout_rx, project, cx)));
1543 let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1544 (thread, agent)
1545 }
1546
1547 pub struct FakeAcpServer {
1548 connection: acp::ClientConnection,
1549 _io_task: Task<()>,
1550 on_user_message: Option<
1551 Rc<
1552 dyn Fn(
1553 acp::SendUserMessageParams,
1554 Entity<FakeAcpServer>,
1555 AsyncApp,
1556 ) -> LocalBoxFuture<'static, Result<()>>,
1557 >,
1558 >,
1559 }
1560
1561 #[derive(Clone)]
1562 struct FakeAgent {
1563 server: Entity<FakeAcpServer>,
1564 cx: AsyncApp,
1565 }
1566
1567 impl acp::Agent for FakeAgent {
1568 async fn initialize(&self) -> Result<acp::InitializeResponse> {
1569 Ok(acp::InitializeResponse {
1570 is_authenticated: true,
1571 })
1572 }
1573
1574 async fn authenticate(&self) -> Result<()> {
1575 Ok(())
1576 }
1577
1578 async fn cancel_send_message(&self) -> Result<()> {
1579 Ok(())
1580 }
1581
1582 async fn send_user_message(&self, request: acp::SendUserMessageParams) -> Result<()> {
1583 let mut cx = self.cx.clone();
1584 let handler = self
1585 .server
1586 .update(&mut cx, |server, _| server.on_user_message.clone())
1587 .ok()
1588 .flatten();
1589 if let Some(handler) = handler {
1590 handler(request, self.server.clone(), self.cx.clone()).await
1591 } else {
1592 anyhow::bail!("No handler for on_user_message")
1593 }
1594 }
1595 }
1596
1597 impl FakeAcpServer {
1598 fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1599 let agent = FakeAgent {
1600 server: cx.entity(),
1601 cx: cx.to_async(),
1602 };
1603 let foreground_executor = cx.foreground_executor().clone();
1604
1605 let (connection, io_fut) = acp::ClientConnection::connect_to_client(
1606 agent.clone(),
1607 stdout,
1608 stdin,
1609 move |fut| {
1610 foreground_executor.spawn(fut).detach();
1611 },
1612 );
1613 FakeAcpServer {
1614 connection: connection,
1615 on_user_message: None,
1616 _io_task: cx.background_spawn(async move {
1617 io_fut.await.log_err();
1618 }),
1619 }
1620 }
1621
1622 fn on_user_message<F>(
1623 &mut self,
1624 handler: impl for<'a> Fn(acp::SendUserMessageParams, Entity<FakeAcpServer>, AsyncApp) -> F
1625 + 'static,
1626 ) where
1627 F: Future<Output = Result<()>> + 'static,
1628 {
1629 self.on_user_message
1630 .replace(Rc::new(move |request, server, cx| {
1631 handler(request, server, cx).boxed_local()
1632 }));
1633 }
1634
1635 fn send_to_zed<T: acp::ClientRequest + 'static>(
1636 &self,
1637 message: T,
1638 ) -> BoxedLocal<Result<T::Response>> {
1639 self.connection
1640 .request(message)
1641 .map(|f| f.map_err(|err| anyhow!(err)))
1642 .boxed_local()
1643 }
1644 }
1645}