1pub use acp::ToolCallId;
2use agent_servers::AgentServer;
3use agentic_coding_protocol::{self as acp, UserMessageChunk};
4use anyhow::{Context as _, Result, anyhow};
5use buffer_diff::BufferDiff;
6use editor::{MultiBuffer, PathKey};
7use futures::{FutureExt, channel::oneshot, future::BoxFuture};
8use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
9use itertools::Itertools;
10use language::{Anchor, Buffer, Capability, LanguageRegistry, OffsetRangeExt as _};
11use markdown::Markdown;
12use project::Project;
13use std::error::Error;
14use std::fmt::{Formatter, Write};
15use std::{
16 fmt::Display,
17 mem,
18 path::{Path, PathBuf},
19 sync::Arc,
20};
21use ui::{App, IconName};
22use util::ResultExt;
23
24#[derive(Clone, Debug, Eq, PartialEq)]
25pub struct UserMessage {
26 pub content: Entity<Markdown>,
27}
28
29impl UserMessage {
30 pub fn from_acp(
31 message: acp::UserMessage,
32 language_registry: Arc<LanguageRegistry>,
33 cx: &mut App,
34 ) -> Self {
35 let mut md_source = String::new();
36
37 for chunk in message.chunks {
38 match chunk {
39 UserMessageChunk::Text { chunk } => md_source.push_str(&chunk),
40 UserMessageChunk::Path { path } => {
41 write!(&mut md_source, "{}", MentionPath(&path)).unwrap()
42 }
43 }
44 }
45
46 Self {
47 content: cx
48 .new(|cx| Markdown::new(md_source.into(), Some(language_registry), None, cx)),
49 }
50 }
51
52 fn to_markdown(&self, cx: &App) -> String {
53 format!("## User\n\n{}\n\n", self.content.read(cx).source())
54 }
55}
56
57#[derive(Debug)]
58pub struct MentionPath<'a>(&'a Path);
59
60impl<'a> MentionPath<'a> {
61 const PREFIX: &'static str = "@file:";
62
63 pub fn new(path: &'a Path) -> Self {
64 MentionPath(path)
65 }
66
67 pub fn try_parse(url: &'a str) -> Option<Self> {
68 let path = url.strip_prefix(Self::PREFIX)?;
69 Some(MentionPath(Path::new(path)))
70 }
71
72 pub fn path(&self) -> &Path {
73 self.0
74 }
75}
76
77impl Display for MentionPath<'_> {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "[@{}]({}{})",
82 self.0.file_name().unwrap_or_default().display(),
83 Self::PREFIX,
84 self.0.display()
85 )
86 }
87}
88
89#[derive(Clone, Debug, Eq, PartialEq)]
90pub struct AssistantMessage {
91 pub chunks: Vec<AssistantMessageChunk>,
92}
93
94impl AssistantMessage {
95 fn to_markdown(&self, cx: &App) -> String {
96 format!(
97 "## Assistant\n\n{}\n\n",
98 self.chunks
99 .iter()
100 .map(|chunk| chunk.to_markdown(cx))
101 .join("\n\n")
102 )
103 }
104}
105
106#[derive(Clone, Debug, Eq, PartialEq)]
107pub enum AssistantMessageChunk {
108 Text { chunk: Entity<Markdown> },
109 Thought { chunk: Entity<Markdown> },
110}
111
112impl AssistantMessageChunk {
113 pub fn from_acp(
114 chunk: acp::AssistantMessageChunk,
115 language_registry: Arc<LanguageRegistry>,
116 cx: &mut App,
117 ) -> Self {
118 match chunk {
119 acp::AssistantMessageChunk::Text { chunk } => Self::Text {
120 chunk: cx.new(|cx| Markdown::new(chunk.into(), Some(language_registry), None, cx)),
121 },
122 acp::AssistantMessageChunk::Thought { chunk } => Self::Thought {
123 chunk: cx.new(|cx| Markdown::new(chunk.into(), Some(language_registry), None, cx)),
124 },
125 }
126 }
127
128 pub fn from_str(chunk: &str, language_registry: Arc<LanguageRegistry>, cx: &mut App) -> Self {
129 Self::Text {
130 chunk: cx.new(|cx| {
131 Markdown::new(chunk.to_owned().into(), Some(language_registry), None, cx)
132 }),
133 }
134 }
135
136 fn to_markdown(&self, cx: &App) -> String {
137 match self {
138 Self::Text { chunk } => chunk.read(cx).source().to_string(),
139 Self::Thought { chunk } => {
140 format!("<thinking>\n{}\n</thinking>", chunk.read(cx).source())
141 }
142 }
143 }
144}
145
146#[derive(Debug)]
147pub enum AgentThreadEntry {
148 UserMessage(UserMessage),
149 AssistantMessage(AssistantMessage),
150 ToolCall(ToolCall),
151}
152
153impl AgentThreadEntry {
154 fn to_markdown(&self, cx: &App) -> String {
155 match self {
156 Self::UserMessage(message) => message.to_markdown(cx),
157 Self::AssistantMessage(message) => message.to_markdown(cx),
158 Self::ToolCall(too_call) => too_call.to_markdown(cx),
159 }
160 }
161}
162
163#[derive(Debug)]
164pub struct ToolCall {
165 pub id: acp::ToolCallId,
166 pub label: Entity<Markdown>,
167 pub icon: IconName,
168 pub content: Option<ToolCallContent>,
169 pub status: ToolCallStatus,
170}
171
172impl ToolCall {
173 fn to_markdown(&self, cx: &App) -> String {
174 let mut markdown = format!(
175 "**Tool Call: {}**\nStatus: {}\n\n",
176 self.label.read(cx).source(),
177 self.status
178 );
179 if let Some(content) = &self.content {
180 markdown.push_str(content.to_markdown(cx).as_str());
181 markdown.push_str("\n\n");
182 }
183 markdown
184 }
185}
186
187#[derive(Debug)]
188pub enum ToolCallStatus {
189 WaitingForConfirmation {
190 confirmation: ToolCallConfirmation,
191 respond_tx: oneshot::Sender<acp::ToolCallConfirmationOutcome>,
192 },
193 Allowed {
194 status: acp::ToolCallStatus,
195 },
196 Rejected,
197 Canceled,
198}
199
200impl Display for ToolCallStatus {
201 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202 write!(
203 f,
204 "{}",
205 match self {
206 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
207 ToolCallStatus::Allowed { status } => match status {
208 acp::ToolCallStatus::Running => "Running",
209 acp::ToolCallStatus::Finished => "Finished",
210 acp::ToolCallStatus::Error => "Error",
211 },
212 ToolCallStatus::Rejected => "Rejected",
213 ToolCallStatus::Canceled => "Canceled",
214 }
215 )
216 }
217}
218
219#[derive(Debug)]
220pub enum ToolCallConfirmation {
221 Edit {
222 description: Option<Entity<Markdown>>,
223 },
224 Execute {
225 command: String,
226 root_command: String,
227 description: Option<Entity<Markdown>>,
228 },
229 Mcp {
230 server_name: String,
231 tool_name: String,
232 tool_display_name: String,
233 description: Option<Entity<Markdown>>,
234 },
235 Fetch {
236 urls: Vec<SharedString>,
237 description: Option<Entity<Markdown>>,
238 },
239 Other {
240 description: Entity<Markdown>,
241 },
242}
243
244impl ToolCallConfirmation {
245 pub fn from_acp(
246 confirmation: acp::ToolCallConfirmation,
247 language_registry: Arc<LanguageRegistry>,
248 cx: &mut App,
249 ) -> Self {
250 let to_md = |description: String, cx: &mut App| -> Entity<Markdown> {
251 cx.new(|cx| {
252 Markdown::new(
253 description.into(),
254 Some(language_registry.clone()),
255 None,
256 cx,
257 )
258 })
259 };
260
261 match confirmation {
262 acp::ToolCallConfirmation::Edit { description } => Self::Edit {
263 description: description.map(|description| to_md(description, cx)),
264 },
265 acp::ToolCallConfirmation::Execute {
266 command,
267 root_command,
268 description,
269 } => Self::Execute {
270 command,
271 root_command,
272 description: description.map(|description| to_md(description, cx)),
273 },
274 acp::ToolCallConfirmation::Mcp {
275 server_name,
276 tool_name,
277 tool_display_name,
278 description,
279 } => Self::Mcp {
280 server_name,
281 tool_name,
282 tool_display_name,
283 description: description.map(|description| to_md(description, cx)),
284 },
285 acp::ToolCallConfirmation::Fetch { urls, description } => Self::Fetch {
286 urls: urls.iter().map(|url| url.into()).collect(),
287 description: description.map(|description| to_md(description, cx)),
288 },
289 acp::ToolCallConfirmation::Other { description } => Self::Other {
290 description: to_md(description, cx),
291 },
292 }
293 }
294}
295
296#[derive(Debug)]
297pub enum ToolCallContent {
298 Markdown { markdown: Entity<Markdown> },
299 Diff { diff: Diff },
300}
301
302impl ToolCallContent {
303 pub fn from_acp(
304 content: acp::ToolCallContent,
305 language_registry: Arc<LanguageRegistry>,
306 cx: &mut App,
307 ) -> Self {
308 match content {
309 acp::ToolCallContent::Markdown { markdown } => Self::Markdown {
310 markdown: cx.new(|cx| Markdown::new_text(markdown.into(), cx)),
311 },
312 acp::ToolCallContent::Diff { diff } => Self::Diff {
313 diff: Diff::from_acp(diff, language_registry, cx),
314 },
315 }
316 }
317
318 fn to_markdown(&self, cx: &App) -> String {
319 match self {
320 Self::Markdown { markdown } => markdown.read(cx).source().to_string(),
321 Self::Diff { diff } => diff.to_markdown(cx),
322 }
323 }
324}
325
326#[derive(Debug)]
327pub struct Diff {
328 pub multibuffer: Entity<MultiBuffer>,
329 pub path: PathBuf,
330 _task: Task<Result<()>>,
331}
332
333impl Diff {
334 pub fn from_acp(
335 diff: acp::Diff,
336 language_registry: Arc<LanguageRegistry>,
337 cx: &mut App,
338 ) -> Self {
339 let acp::Diff {
340 path,
341 old_text,
342 new_text,
343 } = diff;
344
345 let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
346
347 let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
348 let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
349 let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
350 let old_buffer_snapshot = old_buffer.read(cx).snapshot();
351 let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
352 let diff_task = buffer_diff.update(cx, |diff, cx| {
353 diff.set_base_text(
354 old_buffer_snapshot,
355 Some(language_registry.clone()),
356 new_buffer_snapshot,
357 cx,
358 )
359 });
360
361 let task = cx.spawn({
362 let multibuffer = multibuffer.clone();
363 let path = path.clone();
364 async move |cx| {
365 diff_task.await?;
366
367 multibuffer
368 .update(cx, |multibuffer, cx| {
369 let hunk_ranges = {
370 let buffer = new_buffer.read(cx);
371 let diff = buffer_diff.read(cx);
372 diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
373 .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
374 .collect::<Vec<_>>()
375 };
376
377 multibuffer.set_excerpts_for_path(
378 PathKey::for_buffer(&new_buffer, cx),
379 new_buffer.clone(),
380 hunk_ranges,
381 editor::DEFAULT_MULTIBUFFER_CONTEXT,
382 cx,
383 );
384 multibuffer.add_diff(buffer_diff.clone(), cx);
385 })
386 .log_err();
387
388 if let Some(language) = language_registry
389 .language_for_file_path(&path)
390 .await
391 .log_err()
392 {
393 new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
394 }
395
396 anyhow::Ok(())
397 }
398 });
399
400 Self {
401 multibuffer,
402 path,
403 _task: task,
404 }
405 }
406
407 fn to_markdown(&self, cx: &App) -> String {
408 let buffer_text = self
409 .multibuffer
410 .read(cx)
411 .all_buffers()
412 .iter()
413 .map(|buffer| buffer.read(cx).text())
414 .join("\n");
415 format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
416 }
417}
418
419pub struct AcpThread {
420 entries: Vec<AgentThreadEntry>,
421 title: SharedString,
422 project: Entity<Project>,
423 send_task: Option<Task<()>>,
424 connection: Arc<acp::AgentConnection>,
425 child_status: Option<Task<Result<()>>>,
426 _io_task: Task<()>,
427}
428
429pub enum AcpThreadEvent {
430 NewEntry,
431 EntryUpdated(usize),
432}
433
434impl EventEmitter<AcpThreadEvent> for AcpThread {}
435
436#[derive(PartialEq, Eq)]
437pub enum ThreadStatus {
438 Idle,
439 WaitingForToolConfirmation,
440 Generating,
441}
442
443#[derive(Debug, Clone)]
444pub enum LoadError {
445 Unsupported { current_version: SharedString },
446 Exited(i32),
447 Other(SharedString),
448}
449
450impl Display for LoadError {
451 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
452 match self {
453 LoadError::Unsupported { current_version } => {
454 write!(
455 f,
456 "Your installed version of Gemini {} doesn't support the Agentic Coding Protocol (ACP).",
457 current_version
458 )
459 }
460 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
461 LoadError::Other(msg) => write!(f, "{}", msg),
462 }
463 }
464}
465
466impl Error for LoadError {}
467
468impl AcpThread {
469 pub async fn spawn(
470 server: impl AgentServer + 'static,
471 root_dir: &Path,
472 project: Entity<Project>,
473 cx: &mut AsyncApp,
474 ) -> Result<Entity<Self>> {
475 let command = match server.command(&project, cx).await {
476 Ok(command) => command,
477 Err(e) => return Err(anyhow!(LoadError::Other(format!("{e}").into()))),
478 };
479
480 let mut child = util::command::new_smol_command(&command.path)
481 .args(command.args.iter())
482 .current_dir(root_dir)
483 .stdin(std::process::Stdio::piped())
484 .stdout(std::process::Stdio::piped())
485 .stderr(std::process::Stdio::inherit())
486 .kill_on_drop(true)
487 .spawn()?;
488
489 let stdin = child.stdin.take().unwrap();
490 let stdout = child.stdout.take().unwrap();
491
492 cx.new(|cx| {
493 let foreground_executor = cx.foreground_executor().clone();
494
495 let (connection, io_fut) = acp::AgentConnection::connect_to_agent(
496 AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
497 stdin,
498 stdout,
499 move |fut| foreground_executor.spawn(fut).detach(),
500 );
501
502 let io_task = cx.background_spawn(async move {
503 io_fut.await.log_err();
504 });
505
506 let child_status = cx.background_spawn(async move {
507 match child.status().await {
508 Err(e) => Err(anyhow!(e)),
509 Ok(result) if result.success() => Ok(()),
510 Ok(result) => {
511 if let Some(version) = server.version(&command).await.log_err()
512 && !version.supported
513 {
514 Err(anyhow!(LoadError::Unsupported {
515 current_version: version.current_version
516 }))
517 } else {
518 Err(anyhow!(LoadError::Exited(result.code().unwrap_or(-127))))
519 }
520 }
521 }
522 });
523
524 Self {
525 entries: Default::default(),
526 title: "ACP Thread".into(),
527 project,
528 send_task: None,
529 connection: Arc::new(connection),
530 child_status: Some(child_status),
531 _io_task: io_task,
532 }
533 })
534 }
535
536 #[cfg(test)]
537 pub fn fake(
538 stdin: async_pipe::PipeWriter,
539 stdout: async_pipe::PipeReader,
540 project: Entity<Project>,
541 cx: &mut Context<Self>,
542 ) -> Self {
543 let foreground_executor = cx.foreground_executor().clone();
544
545 let (connection, io_fut) = acp::AgentConnection::connect_to_agent(
546 AcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
547 stdin,
548 stdout,
549 move |fut| {
550 foreground_executor.spawn(fut).detach();
551 },
552 );
553
554 let io_task = cx.background_spawn({
555 async move {
556 io_fut.await.log_err();
557 }
558 });
559
560 Self {
561 entries: Default::default(),
562 title: "ACP Thread".into(),
563 project,
564 send_task: None,
565 connection: Arc::new(connection),
566 child_status: None,
567 _io_task: io_task,
568 }
569 }
570
571 pub fn title(&self) -> SharedString {
572 self.title.clone()
573 }
574
575 pub fn entries(&self) -> &[AgentThreadEntry] {
576 &self.entries
577 }
578
579 pub fn status(&self) -> ThreadStatus {
580 if self.send_task.is_some() {
581 if self.waiting_for_tool_confirmation() {
582 ThreadStatus::WaitingForToolConfirmation
583 } else {
584 ThreadStatus::Generating
585 }
586 } else {
587 ThreadStatus::Idle
588 }
589 }
590
591 pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
592 self.entries.push(entry);
593 cx.emit(AcpThreadEvent::NewEntry);
594 }
595
596 pub fn push_assistant_chunk(
597 &mut self,
598 chunk: acp::AssistantMessageChunk,
599 cx: &mut Context<Self>,
600 ) {
601 let entries_len = self.entries.len();
602 if let Some(last_entry) = self.entries.last_mut()
603 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
604 {
605 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
606
607 match (chunks.last_mut(), &chunk) {
608 (
609 Some(AssistantMessageChunk::Text { chunk: old_chunk }),
610 acp::AssistantMessageChunk::Text { chunk: new_chunk },
611 )
612 | (
613 Some(AssistantMessageChunk::Thought { chunk: old_chunk }),
614 acp::AssistantMessageChunk::Thought { chunk: new_chunk },
615 ) => {
616 old_chunk.update(cx, |old_chunk, cx| {
617 old_chunk.append(&new_chunk, cx);
618 });
619 }
620 _ => {
621 chunks.push(AssistantMessageChunk::from_acp(
622 chunk,
623 self.project.read(cx).languages().clone(),
624 cx,
625 ));
626 }
627 }
628 } else {
629 let chunk = AssistantMessageChunk::from_acp(
630 chunk,
631 self.project.read(cx).languages().clone(),
632 cx,
633 );
634
635 self.push_entry(
636 AgentThreadEntry::AssistantMessage(AssistantMessage {
637 chunks: vec![chunk],
638 }),
639 cx,
640 );
641 }
642 }
643
644 pub fn request_tool_call(
645 &mut self,
646 label: String,
647 icon: acp::Icon,
648 content: Option<acp::ToolCallContent>,
649 confirmation: acp::ToolCallConfirmation,
650 cx: &mut Context<Self>,
651 ) -> ToolCallRequest {
652 let (tx, rx) = oneshot::channel();
653
654 let status = ToolCallStatus::WaitingForConfirmation {
655 confirmation: ToolCallConfirmation::from_acp(
656 confirmation,
657 self.project.read(cx).languages().clone(),
658 cx,
659 ),
660 respond_tx: tx,
661 };
662
663 let id = self.insert_tool_call(label, status, icon, content, cx);
664 ToolCallRequest { id, outcome: rx }
665 }
666
667 pub fn push_tool_call(
668 &mut self,
669 label: String,
670 icon: acp::Icon,
671 content: Option<acp::ToolCallContent>,
672 cx: &mut Context<Self>,
673 ) -> acp::ToolCallId {
674 let status = ToolCallStatus::Allowed {
675 status: acp::ToolCallStatus::Running,
676 };
677
678 self.insert_tool_call(label, status, icon, content, cx)
679 }
680
681 fn insert_tool_call(
682 &mut self,
683 label: String,
684 status: ToolCallStatus,
685 icon: acp::Icon,
686 content: Option<acp::ToolCallContent>,
687 cx: &mut Context<Self>,
688 ) -> acp::ToolCallId {
689 let language_registry = self.project.read(cx).languages().clone();
690 let id = acp::ToolCallId(self.entries.len() as u64);
691
692 self.push_entry(
693 AgentThreadEntry::ToolCall(ToolCall {
694 id,
695 label: cx.new(|cx| {
696 Markdown::new(label.into(), Some(language_registry.clone()), None, cx)
697 }),
698 icon: acp_icon_to_ui_icon(icon),
699 content: content
700 .map(|content| ToolCallContent::from_acp(content, language_registry, cx)),
701 status,
702 }),
703 cx,
704 );
705
706 id
707 }
708
709 pub fn authorize_tool_call(
710 &mut self,
711 id: acp::ToolCallId,
712 outcome: acp::ToolCallConfirmationOutcome,
713 cx: &mut Context<Self>,
714 ) {
715 let Some((ix, call)) = self.tool_call_mut(id) else {
716 return;
717 };
718
719 let new_status = if outcome == acp::ToolCallConfirmationOutcome::Reject {
720 ToolCallStatus::Rejected
721 } else {
722 ToolCallStatus::Allowed {
723 status: acp::ToolCallStatus::Running,
724 }
725 };
726
727 let curr_status = mem::replace(&mut call.status, new_status);
728
729 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
730 respond_tx.send(outcome).log_err();
731 } else if cfg!(debug_assertions) {
732 panic!("tried to authorize an already authorized tool call");
733 }
734
735 cx.emit(AcpThreadEvent::EntryUpdated(ix));
736 }
737
738 pub fn update_tool_call(
739 &mut self,
740 id: acp::ToolCallId,
741 new_status: acp::ToolCallStatus,
742 new_content: Option<acp::ToolCallContent>,
743 cx: &mut Context<Self>,
744 ) -> Result<()> {
745 let language_registry = self.project.read(cx).languages().clone();
746 let (ix, call) = self.tool_call_mut(id).context("Entry not found")?;
747
748 call.content = new_content
749 .map(|new_content| ToolCallContent::from_acp(new_content, language_registry, cx));
750
751 match &mut call.status {
752 ToolCallStatus::Allowed { status } => {
753 *status = new_status;
754 }
755 ToolCallStatus::WaitingForConfirmation { .. } => {
756 anyhow::bail!("Tool call hasn't been authorized yet")
757 }
758 ToolCallStatus::Rejected => {
759 anyhow::bail!("Tool call was rejected and therefore can't be updated")
760 }
761 ToolCallStatus::Canceled => {
762 call.status = ToolCallStatus::Allowed { status: new_status };
763 }
764 }
765
766 cx.emit(AcpThreadEvent::EntryUpdated(ix));
767 Ok(())
768 }
769
770 fn tool_call_mut(&mut self, id: acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
771 let entry = self.entries.get_mut(id.0 as usize);
772 debug_assert!(
773 entry.is_some(),
774 "We shouldn't give out ids to entries that don't exist"
775 );
776 match entry {
777 Some(AgentThreadEntry::ToolCall(call)) if call.id == id => Some((id.0 as usize, call)),
778 _ => {
779 if cfg!(debug_assertions) {
780 panic!("entry is not a tool call");
781 }
782 None
783 }
784 }
785 }
786
787 /// Returns true if the last turn is awaiting tool authorization
788 pub fn waiting_for_tool_confirmation(&self) -> bool {
789 for entry in self.entries.iter().rev() {
790 match &entry {
791 AgentThreadEntry::ToolCall(call) => match call.status {
792 ToolCallStatus::WaitingForConfirmation { .. } => return true,
793 ToolCallStatus::Allowed { .. }
794 | ToolCallStatus::Rejected
795 | ToolCallStatus::Canceled => continue,
796 },
797 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
798 // Reached the beginning of the turn
799 return false;
800 }
801 }
802 }
803 false
804 }
805
806 pub fn initialize(&self) -> impl use<> + Future<Output = Result<acp::InitializeResponse>> {
807 let connection = self.connection.clone();
808 async move { Ok(connection.request(acp::InitializeParams).await?) }
809 }
810
811 pub fn authenticate(&self) -> impl use<> + Future<Output = Result<()>> {
812 let connection = self.connection.clone();
813 async move { Ok(connection.request(acp::AuthenticateParams).await?) }
814 }
815
816 pub fn send(
817 &mut self,
818 message: impl Into<acp::UserMessage>,
819 cx: &mut Context<Self>,
820 ) -> BoxFuture<'static, Result<()>> {
821 let agent = self.connection.clone();
822 let message = message.into();
823 self.push_entry(
824 AgentThreadEntry::UserMessage(UserMessage::from_acp(
825 message.clone(),
826 self.project.read(cx).languages().clone(),
827 cx,
828 )),
829 cx,
830 );
831
832 let (tx, rx) = oneshot::channel();
833 let cancel = self.cancel(cx);
834
835 self.send_task = Some(cx.spawn(async move |this, cx| {
836 cancel.await.log_err();
837
838 let result = agent.request(acp::SendUserMessageParams { message }).await;
839 tx.send(result).log_err();
840 this.update(cx, |this, _cx| this.send_task.take()).log_err();
841 }));
842
843 async move {
844 match rx.await {
845 Ok(Err(e)) => Err(e)?,
846 _ => Ok(()),
847 }
848 }
849 .boxed()
850 }
851
852 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
853 let agent = self.connection.clone();
854
855 if self.send_task.take().is_some() {
856 cx.spawn(async move |this, cx| {
857 agent.request(acp::CancelSendMessageParams).await?;
858
859 this.update(cx, |this, _cx| {
860 for entry in this.entries.iter_mut() {
861 if let AgentThreadEntry::ToolCall(call) = entry {
862 let cancel = matches!(
863 call.status,
864 ToolCallStatus::WaitingForConfirmation { .. }
865 | ToolCallStatus::Allowed {
866 status: acp::ToolCallStatus::Running
867 }
868 );
869
870 if cancel {
871 let curr_status =
872 mem::replace(&mut call.status, ToolCallStatus::Canceled);
873
874 if let ToolCallStatus::WaitingForConfirmation {
875 respond_tx, ..
876 } = curr_status
877 {
878 respond_tx
879 .send(acp::ToolCallConfirmationOutcome::Cancel)
880 .ok();
881 }
882 }
883 }
884 }
885 })
886 })
887 } else {
888 Task::ready(Ok(()))
889 }
890 }
891
892 pub fn child_status(&mut self) -> Option<Task<Result<()>>> {
893 self.child_status.take()
894 }
895
896 pub fn to_markdown(&self, cx: &App) -> String {
897 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
898 }
899}
900
901struct AcpClientDelegate {
902 thread: WeakEntity<AcpThread>,
903 cx: AsyncApp,
904 // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
905}
906
907impl AcpClientDelegate {
908 fn new(thread: WeakEntity<AcpThread>, cx: AsyncApp) -> Self {
909 Self { thread, cx }
910 }
911}
912
913impl acp::Client for AcpClientDelegate {
914 async fn stream_assistant_message_chunk(
915 &self,
916 params: acp::StreamAssistantMessageChunkParams,
917 ) -> Result<()> {
918 let cx = &mut self.cx.clone();
919
920 cx.update(|cx| {
921 self.thread
922 .update(cx, |thread, cx| {
923 thread.push_assistant_chunk(params.chunk, cx)
924 })
925 .ok();
926 })?;
927
928 Ok(())
929 }
930
931 async fn request_tool_call_confirmation(
932 &self,
933 request: acp::RequestToolCallConfirmationParams,
934 ) -> Result<acp::RequestToolCallConfirmationResponse> {
935 let cx = &mut self.cx.clone();
936 let ToolCallRequest { id, outcome } = cx
937 .update(|cx| {
938 self.thread.update(cx, |thread, cx| {
939 thread.request_tool_call(
940 request.label,
941 request.icon,
942 request.content,
943 request.confirmation,
944 cx,
945 )
946 })
947 })?
948 .context("Failed to update thread")?;
949
950 Ok(acp::RequestToolCallConfirmationResponse {
951 id,
952 outcome: outcome.await?,
953 })
954 }
955
956 async fn push_tool_call(
957 &self,
958 request: acp::PushToolCallParams,
959 ) -> Result<acp::PushToolCallResponse> {
960 let cx = &mut self.cx.clone();
961 let id = cx
962 .update(|cx| {
963 self.thread.update(cx, |thread, cx| {
964 thread.push_tool_call(request.label, request.icon, request.content, cx)
965 })
966 })?
967 .context("Failed to update thread")?;
968
969 Ok(acp::PushToolCallResponse { id })
970 }
971
972 async fn update_tool_call(&self, request: acp::UpdateToolCallParams) -> Result<()> {
973 let cx = &mut self.cx.clone();
974
975 cx.update(|cx| {
976 self.thread.update(cx, |thread, cx| {
977 thread.update_tool_call(request.tool_call_id, request.status, request.content, cx)
978 })
979 })?
980 .context("Failed to update thread")??;
981
982 Ok(())
983 }
984}
985
986fn acp_icon_to_ui_icon(icon: acp::Icon) -> IconName {
987 match icon {
988 acp::Icon::FileSearch => IconName::ToolSearch,
989 acp::Icon::Folder => IconName::ToolFolder,
990 acp::Icon::Globe => IconName::ToolWeb,
991 acp::Icon::Hammer => IconName::ToolHammer,
992 acp::Icon::LightBulb => IconName::ToolBulb,
993 acp::Icon::Pencil => IconName::ToolPencil,
994 acp::Icon::Regex => IconName::ToolRegex,
995 acp::Icon::Terminal => IconName::ToolTerminal,
996 }
997}
998
999pub struct ToolCallRequest {
1000 pub id: acp::ToolCallId,
1001 pub outcome: oneshot::Receiver<acp::ToolCallConfirmationOutcome>,
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::*;
1007 use agent_servers::{AgentServerCommand, AgentServerVersion};
1008 use async_pipe::{PipeReader, PipeWriter};
1009 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1010 use gpui::{AsyncApp, TestAppContext};
1011 use indoc::indoc;
1012 use project::FakeFs;
1013 use serde_json::json;
1014 use settings::SettingsStore;
1015 use smol::{future::BoxedLocal, stream::StreamExt as _};
1016 use std::{cell::RefCell, env, path::Path, rc::Rc, time::Duration};
1017 use util::path;
1018
1019 fn init_test(cx: &mut TestAppContext) {
1020 env_logger::try_init().ok();
1021 cx.update(|cx| {
1022 let settings_store = SettingsStore::test(cx);
1023 cx.set_global(settings_store);
1024 Project::init_settings(cx);
1025 language::init(cx);
1026 });
1027 }
1028
1029 #[gpui::test]
1030 async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1031 init_test(cx);
1032
1033 cx.executor().allow_parking();
1034
1035 let fs = FakeFs::new(cx.executor());
1036 let project = Project::test(fs, [], cx).await;
1037 let (thread, fake_server) = fake_acp_thread(project, cx);
1038
1039 fake_server.update(cx, |fake_server, _| {
1040 fake_server.on_user_message(move |_, server, mut cx| async move {
1041 server
1042 .update(&mut cx, |server, _| {
1043 server.send_to_zed(acp::StreamAssistantMessageChunkParams {
1044 chunk: acp::AssistantMessageChunk::Thought {
1045 chunk: "Thinking ".into(),
1046 },
1047 })
1048 })?
1049 .await
1050 .unwrap();
1051 server
1052 .update(&mut cx, |server, _| {
1053 server.send_to_zed(acp::StreamAssistantMessageChunkParams {
1054 chunk: acp::AssistantMessageChunk::Thought {
1055 chunk: "hard!".into(),
1056 },
1057 })
1058 })?
1059 .await
1060 .unwrap();
1061
1062 Ok(())
1063 })
1064 });
1065
1066 thread
1067 .update(cx, |thread, cx| thread.send("Hello from Zed!", cx))
1068 .await
1069 .unwrap();
1070
1071 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1072 assert_eq!(
1073 output,
1074 indoc! {r#"
1075 ## User
1076
1077 Hello from Zed!
1078
1079 ## Assistant
1080
1081 <thinking>
1082 Thinking hard!
1083 </thinking>
1084
1085 "#}
1086 );
1087 }
1088
1089 #[gpui::test]
1090 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1091 init_test(cx);
1092
1093 let fs = FakeFs::new(cx.executor());
1094 let project = Project::test(fs, [], cx).await;
1095 let (thread, fake_server) = fake_acp_thread(project, cx);
1096
1097 let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1098
1099 let tool_call_id = Rc::new(RefCell::new(None));
1100 let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1101 fake_server.update(cx, |fake_server, _| {
1102 let tool_call_id = tool_call_id.clone();
1103 fake_server.on_user_message(move |_, server, mut cx| {
1104 let end_turn_rx = end_turn_rx.clone();
1105 let tool_call_id = tool_call_id.clone();
1106 async move {
1107 let tool_call_result = server
1108 .update(&mut cx, |server, _| {
1109 server.send_to_zed(acp::PushToolCallParams {
1110 label: "Fetch".to_string(),
1111 icon: acp::Icon::Globe,
1112 content: None,
1113 })
1114 })?
1115 .await
1116 .unwrap();
1117 *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1118 end_turn_rx.take().unwrap().await.ok();
1119
1120 Ok(())
1121 }
1122 })
1123 });
1124
1125 let request = thread.update(cx, |thread, cx| {
1126 thread.send("Fetch https://example.com", cx)
1127 });
1128
1129 run_until_first_tool_call(&thread, cx).await;
1130
1131 thread.read_with(cx, |thread, _| {
1132 assert!(matches!(
1133 thread.entries[1],
1134 AgentThreadEntry::ToolCall(ToolCall {
1135 status: ToolCallStatus::Allowed {
1136 status: acp::ToolCallStatus::Running,
1137 ..
1138 },
1139 ..
1140 })
1141 ));
1142 });
1143
1144 cx.run_until_parked();
1145
1146 thread
1147 .update(cx, |thread, cx| thread.cancel(cx))
1148 .await
1149 .unwrap();
1150
1151 thread.read_with(cx, |thread, _| {
1152 assert!(matches!(
1153 &thread.entries[1],
1154 AgentThreadEntry::ToolCall(ToolCall {
1155 status: ToolCallStatus::Canceled,
1156 ..
1157 })
1158 ));
1159 });
1160
1161 fake_server
1162 .update(cx, |fake_server, _| {
1163 fake_server.send_to_zed(acp::UpdateToolCallParams {
1164 tool_call_id: tool_call_id.borrow().unwrap(),
1165 status: acp::ToolCallStatus::Finished,
1166 content: None,
1167 })
1168 })
1169 .await
1170 .unwrap();
1171
1172 drop(end_turn_tx);
1173 request.await.unwrap();
1174
1175 thread.read_with(cx, |thread, _| {
1176 assert!(matches!(
1177 thread.entries[1],
1178 AgentThreadEntry::ToolCall(ToolCall {
1179 status: ToolCallStatus::Allowed {
1180 status: acp::ToolCallStatus::Finished,
1181 ..
1182 },
1183 ..
1184 })
1185 ));
1186 });
1187 }
1188
1189 #[gpui::test]
1190 #[cfg_attr(not(feature = "gemini"), ignore)]
1191 async fn test_gemini_basic(cx: &mut TestAppContext) {
1192 init_test(cx);
1193
1194 cx.executor().allow_parking();
1195
1196 let fs = FakeFs::new(cx.executor());
1197 let project = Project::test(fs, [], cx).await;
1198 let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1199 thread
1200 .update(cx, |thread, cx| thread.send("Hello from Zed!", cx))
1201 .await
1202 .unwrap();
1203
1204 thread.read_with(cx, |thread, _| {
1205 assert_eq!(thread.entries.len(), 2);
1206 assert!(matches!(
1207 thread.entries[0],
1208 AgentThreadEntry::UserMessage(_)
1209 ));
1210 assert!(matches!(
1211 thread.entries[1],
1212 AgentThreadEntry::AssistantMessage(_)
1213 ));
1214 });
1215 }
1216
1217 #[gpui::test]
1218 #[cfg_attr(not(feature = "gemini"), ignore)]
1219 async fn test_gemini_path_mentions(cx: &mut TestAppContext) {
1220 init_test(cx);
1221
1222 cx.executor().allow_parking();
1223 let tempdir = tempfile::tempdir().unwrap();
1224 std::fs::write(
1225 tempdir.path().join("foo.rs"),
1226 indoc! {"
1227 fn main() {
1228 println!(\"Hello, world!\");
1229 }
1230 "},
1231 )
1232 .expect("failed to write file");
1233 let project = Project::example([tempdir.path()], &mut cx.to_async()).await;
1234 let thread = gemini_acp_thread(project.clone(), tempdir.path(), cx).await;
1235 thread
1236 .update(cx, |thread, cx| {
1237 thread.send(
1238 acp::UserMessage {
1239 chunks: vec![
1240 "Read the file ".into(),
1241 Path::new("foo.rs").into(),
1242 " and tell me what the content of the println! is".into(),
1243 ],
1244 },
1245 cx,
1246 )
1247 })
1248 .await
1249 .unwrap();
1250
1251 thread.read_with(cx, |thread, cx| {
1252 assert_eq!(thread.entries.len(), 3);
1253 assert!(matches!(
1254 thread.entries[0],
1255 AgentThreadEntry::UserMessage(_)
1256 ));
1257 assert!(matches!(thread.entries[1], AgentThreadEntry::ToolCall(_)));
1258 let AgentThreadEntry::AssistantMessage(assistant_message) = &thread.entries[2] else {
1259 panic!("Expected AssistantMessage")
1260 };
1261 assert!(
1262 assistant_message.to_markdown(cx).contains("Hello, world!"),
1263 "unexpected assistant message: {:?}",
1264 assistant_message.to_markdown(cx)
1265 );
1266 });
1267 }
1268
1269 #[gpui::test]
1270 #[cfg_attr(not(feature = "gemini"), ignore)]
1271 async fn test_gemini_tool_call(cx: &mut TestAppContext) {
1272 init_test(cx);
1273
1274 cx.executor().allow_parking();
1275
1276 let fs = FakeFs::new(cx.executor());
1277 fs.insert_tree(
1278 path!("/private/tmp"),
1279 json!({"foo": "Lorem ipsum dolor", "bar": "bar", "baz": "baz"}),
1280 )
1281 .await;
1282 let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1283 let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1284 thread
1285 .update(cx, |thread, cx| {
1286 thread.send(
1287 "Read the '/private/tmp/foo' file and tell me what you see.",
1288 cx,
1289 )
1290 })
1291 .await
1292 .unwrap();
1293 thread.read_with(cx, |thread, _cx| {
1294 assert!(matches!(
1295 &thread.entries()[2],
1296 AgentThreadEntry::ToolCall(ToolCall {
1297 status: ToolCallStatus::Allowed { .. },
1298 ..
1299 })
1300 ));
1301
1302 assert!(matches!(
1303 thread.entries[3],
1304 AgentThreadEntry::AssistantMessage(_)
1305 ));
1306 });
1307 }
1308
1309 #[gpui::test]
1310 #[cfg_attr(not(feature = "gemini"), ignore)]
1311 async fn test_gemini_tool_call_with_confirmation(cx: &mut TestAppContext) {
1312 init_test(cx);
1313
1314 cx.executor().allow_parking();
1315
1316 let fs = FakeFs::new(cx.executor());
1317 let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1318 let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1319 let full_turn = thread.update(cx, |thread, cx| {
1320 thread.send(r#"Run `echo "Hello, world!"`"#, cx)
1321 });
1322
1323 run_until_first_tool_call(&thread, cx).await;
1324
1325 let tool_call_id = thread.read_with(cx, |thread, _cx| {
1326 let AgentThreadEntry::ToolCall(ToolCall {
1327 id,
1328 status:
1329 ToolCallStatus::WaitingForConfirmation {
1330 confirmation: ToolCallConfirmation::Execute { root_command, .. },
1331 ..
1332 },
1333 ..
1334 }) = &thread.entries()[2]
1335 else {
1336 panic!();
1337 };
1338
1339 assert_eq!(root_command, "echo");
1340
1341 *id
1342 });
1343
1344 thread.update(cx, |thread, cx| {
1345 thread.authorize_tool_call(tool_call_id, acp::ToolCallConfirmationOutcome::Allow, cx);
1346
1347 assert!(matches!(
1348 &thread.entries()[2],
1349 AgentThreadEntry::ToolCall(ToolCall {
1350 status: ToolCallStatus::Allowed { .. },
1351 ..
1352 })
1353 ));
1354 });
1355
1356 full_turn.await.unwrap();
1357
1358 thread.read_with(cx, |thread, cx| {
1359 let AgentThreadEntry::ToolCall(ToolCall {
1360 content: Some(ToolCallContent::Markdown { markdown }),
1361 status: ToolCallStatus::Allowed { .. },
1362 ..
1363 }) = &thread.entries()[2]
1364 else {
1365 panic!();
1366 };
1367
1368 markdown.read_with(cx, |md, _cx| {
1369 assert!(
1370 md.source().contains("Hello, world!"),
1371 r#"Expected '{}' to contain "Hello, world!""#,
1372 md.source()
1373 );
1374 });
1375 });
1376 }
1377
1378 #[gpui::test]
1379 #[cfg_attr(not(feature = "gemini"), ignore)]
1380 async fn test_gemini_cancel(cx: &mut TestAppContext) {
1381 init_test(cx);
1382
1383 cx.executor().allow_parking();
1384
1385 let fs = FakeFs::new(cx.executor());
1386 let project = Project::test(fs, [path!("/private/tmp").as_ref()], cx).await;
1387 let thread = gemini_acp_thread(project.clone(), "/private/tmp", cx).await;
1388 let full_turn = thread.update(cx, |thread, cx| {
1389 thread.send(r#"Run `echo "Hello, world!"`"#, cx)
1390 });
1391
1392 let first_tool_call_ix = run_until_first_tool_call(&thread, cx).await;
1393
1394 thread.read_with(cx, |thread, _cx| {
1395 let AgentThreadEntry::ToolCall(ToolCall {
1396 id,
1397 status:
1398 ToolCallStatus::WaitingForConfirmation {
1399 confirmation: ToolCallConfirmation::Execute { root_command, .. },
1400 ..
1401 },
1402 ..
1403 }) = &thread.entries()[first_tool_call_ix]
1404 else {
1405 panic!("{:?}", thread.entries()[1]);
1406 };
1407
1408 assert_eq!(root_command, "echo");
1409
1410 *id
1411 });
1412
1413 thread
1414 .update(cx, |thread, cx| thread.cancel(cx))
1415 .await
1416 .unwrap();
1417 full_turn.await.unwrap();
1418 thread.read_with(cx, |thread, _| {
1419 let AgentThreadEntry::ToolCall(ToolCall {
1420 status: ToolCallStatus::Canceled,
1421 ..
1422 }) = &thread.entries()[first_tool_call_ix]
1423 else {
1424 panic!();
1425 };
1426 });
1427
1428 thread
1429 .update(cx, |thread, cx| {
1430 thread.send(r#"Stop running and say goodbye to me."#, cx)
1431 })
1432 .await
1433 .unwrap();
1434 thread.read_with(cx, |thread, _| {
1435 assert!(matches!(
1436 &thread.entries().last().unwrap(),
1437 AgentThreadEntry::AssistantMessage(..),
1438 ))
1439 });
1440 }
1441
1442 async fn run_until_first_tool_call(
1443 thread: &Entity<AcpThread>,
1444 cx: &mut TestAppContext,
1445 ) -> usize {
1446 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1447
1448 let subscription = cx.update(|cx| {
1449 cx.subscribe(thread, move |thread, _, cx| {
1450 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1451 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1452 return tx.try_send(ix).unwrap();
1453 }
1454 }
1455 })
1456 });
1457
1458 select! {
1459 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1460 panic!("Timeout waiting for tool call")
1461 }
1462 ix = rx.next().fuse() => {
1463 drop(subscription);
1464 ix.unwrap()
1465 }
1466 }
1467 }
1468
1469 pub async fn gemini_acp_thread(
1470 project: Entity<Project>,
1471 current_dir: impl AsRef<Path>,
1472 cx: &mut TestAppContext,
1473 ) -> Entity<AcpThread> {
1474 struct DevGemini;
1475
1476 impl agent_servers::AgentServer for DevGemini {
1477 async fn command(
1478 &self,
1479 _project: &Entity<Project>,
1480 _cx: &mut AsyncApp,
1481 ) -> Result<agent_servers::AgentServerCommand> {
1482 let cli_path = Path::new(env!("CARGO_MANIFEST_DIR"))
1483 .join("../../../gemini-cli/packages/cli")
1484 .to_string_lossy()
1485 .to_string();
1486
1487 Ok(AgentServerCommand {
1488 path: "node".into(),
1489 args: vec![cli_path, "--acp".into()],
1490 env: None,
1491 })
1492 }
1493
1494 async fn version(
1495 &self,
1496 _command: &agent_servers::AgentServerCommand,
1497 ) -> Result<AgentServerVersion> {
1498 Ok(AgentServerVersion {
1499 current_version: "0.1.0".into(),
1500 supported: true,
1501 })
1502 }
1503 }
1504
1505 let thread = AcpThread::spawn(DevGemini, current_dir.as_ref(), project, &mut cx.to_async())
1506 .await
1507 .unwrap();
1508
1509 thread
1510 .update(cx, |thread, _| thread.initialize())
1511 .await
1512 .unwrap();
1513 thread
1514 }
1515
1516 pub fn fake_acp_thread(
1517 project: Entity<Project>,
1518 cx: &mut TestAppContext,
1519 ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1520 let (stdin_tx, stdin_rx) = async_pipe::pipe();
1521 let (stdout_tx, stdout_rx) = async_pipe::pipe();
1522 let thread = cx.update(|cx| cx.new(|cx| AcpThread::fake(stdin_tx, stdout_rx, project, cx)));
1523 let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1524 (thread, agent)
1525 }
1526
1527 pub struct FakeAcpServer {
1528 connection: acp::ClientConnection,
1529 _io_task: Task<()>,
1530 on_user_message: Option<
1531 Rc<
1532 dyn Fn(
1533 acp::SendUserMessageParams,
1534 Entity<FakeAcpServer>,
1535 AsyncApp,
1536 ) -> LocalBoxFuture<'static, Result<()>>,
1537 >,
1538 >,
1539 }
1540
1541 #[derive(Clone)]
1542 struct FakeAgent {
1543 server: Entity<FakeAcpServer>,
1544 cx: AsyncApp,
1545 }
1546
1547 impl acp::Agent for FakeAgent {
1548 async fn initialize(&self) -> Result<acp::InitializeResponse> {
1549 Ok(acp::InitializeResponse {
1550 is_authenticated: true,
1551 })
1552 }
1553
1554 async fn authenticate(&self) -> Result<()> {
1555 Ok(())
1556 }
1557
1558 async fn cancel_send_message(&self) -> Result<()> {
1559 Ok(())
1560 }
1561
1562 async fn send_user_message(&self, request: acp::SendUserMessageParams) -> Result<()> {
1563 let mut cx = self.cx.clone();
1564 let handler = self
1565 .server
1566 .update(&mut cx, |server, _| server.on_user_message.clone())
1567 .ok()
1568 .flatten();
1569 if let Some(handler) = handler {
1570 handler(request, self.server.clone(), self.cx.clone()).await
1571 } else {
1572 anyhow::bail!("No handler for on_user_message")
1573 }
1574 }
1575 }
1576
1577 impl FakeAcpServer {
1578 fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1579 let agent = FakeAgent {
1580 server: cx.entity(),
1581 cx: cx.to_async(),
1582 };
1583 let foreground_executor = cx.foreground_executor().clone();
1584
1585 let (connection, io_fut) = acp::ClientConnection::connect_to_client(
1586 agent.clone(),
1587 stdout,
1588 stdin,
1589 move |fut| {
1590 foreground_executor.spawn(fut).detach();
1591 },
1592 );
1593 FakeAcpServer {
1594 connection: connection,
1595 on_user_message: None,
1596 _io_task: cx.background_spawn(async move {
1597 io_fut.await.log_err();
1598 }),
1599 }
1600 }
1601
1602 fn on_user_message<F>(
1603 &mut self,
1604 handler: impl for<'a> Fn(acp::SendUserMessageParams, Entity<FakeAcpServer>, AsyncApp) -> F
1605 + 'static,
1606 ) where
1607 F: Future<Output = Result<()>> + 'static,
1608 {
1609 self.on_user_message
1610 .replace(Rc::new(move |request, server, cx| {
1611 handler(request, server, cx).boxed_local()
1612 }));
1613 }
1614
1615 fn send_to_zed<T: acp::ClientRequest + 'static>(
1616 &self,
1617 message: T,
1618 ) -> BoxedLocal<Result<T::Response>> {
1619 self.connection
1620 .request(message)
1621 .map(|f| f.map_err(|err| anyhow!(err)))
1622 .boxed_local()
1623 }
1624 }
1625}