1mod connection;
2pub use connection::*;
3
4pub use acp_old::ToolCallId;
5use agent_client_protocol::{self as acp};
6use agentic_coding_protocol as acp_old;
7use anyhow::{Context as _, Result};
8use assistant_tool::ActionLog;
9use buffer_diff::BufferDiff;
10use editor::{Bias, MultiBuffer, PathKey};
11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
13use itertools::Itertools;
14use language::{
15 Anchor, Buffer, BufferSnapshot, Capability, LanguageRegistry, OffsetRangeExt as _, Point,
16 text_diff,
17};
18use markdown::Markdown;
19use project::{AgentLocation, Project};
20use std::collections::HashMap;
21use std::error::Error;
22use std::fmt::{Formatter, Write};
23use std::{
24 fmt::Display,
25 mem,
26 path::{Path, PathBuf},
27 sync::Arc,
28};
29use ui::{App, IconName};
30use util::ResultExt;
31
32#[derive(Debug)]
33pub struct UserMessage {
34 pub content: ContentBlock,
35}
36
37impl UserMessage {
38 pub fn from_acp(
39 message: &[acp::ContentBlock],
40 language_registry: Arc<LanguageRegistry>,
41 cx: &mut App,
42 ) -> Self {
43 let mut content = ContentBlock::Empty;
44 for chunk in message {
45 content.append(chunk, &language_registry, cx)
46 }
47 Self { content: content }
48 }
49
50 fn to_markdown(&self, cx: &App) -> String {
51 format!("## User\n{}\n", self.content.to_markdown(cx))
52 }
53}
54
55#[derive(Debug)]
56pub struct MentionPath<'a>(&'a Path);
57
58impl<'a> MentionPath<'a> {
59 const PREFIX: &'static str = "@file:";
60
61 pub fn new(path: &'a Path) -> Self {
62 MentionPath(path)
63 }
64
65 pub fn try_parse(url: &'a str) -> Option<Self> {
66 let path = url.strip_prefix(Self::PREFIX)?;
67 Some(MentionPath(Path::new(path)))
68 }
69
70 pub fn path(&self) -> &Path {
71 self.0
72 }
73}
74
75impl Display for MentionPath<'_> {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(
78 f,
79 "[@{}]({}{})",
80 self.0.file_name().unwrap_or_default().display(),
81 Self::PREFIX,
82 self.0.display()
83 )
84 }
85}
86
87#[derive(Debug, PartialEq)]
88pub struct AssistantMessage {
89 pub chunks: Vec<AssistantMessageChunk>,
90}
91
92impl AssistantMessage {
93 pub fn to_markdown(&self, cx: &App) -> String {
94 format!(
95 "## Assistant\n\n{}\n\n",
96 self.chunks
97 .iter()
98 .map(|chunk| chunk.to_markdown(cx))
99 .join("\n\n")
100 )
101 }
102}
103
104#[derive(Debug, PartialEq)]
105pub enum AssistantMessageChunk {
106 Message { block: ContentBlock },
107 Thought { block: ContentBlock },
108}
109
110impl AssistantMessageChunk {
111 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
112 Self::Message {
113 block: ContentBlock::new(
114 &acp::ContentBlock::Text(acp::TextContent {
115 text: chunk.to_owned().into(),
116 annotations: None,
117 }),
118 language_registry,
119 cx,
120 ),
121 }
122 }
123
124 fn to_markdown(&self, cx: &App) -> String {
125 match self {
126 Self::Message { block } => block.to_markdown(cx).to_string(),
127 Self::Thought { block } => {
128 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
129 }
130 }
131 }
132}
133
134#[derive(Debug)]
135pub enum AgentThreadEntry {
136 UserMessage(UserMessage),
137 AssistantMessage(AssistantMessage),
138 ToolCall(ToolCall),
139}
140
141impl AgentThreadEntry {
142 fn to_markdown(&self, cx: &App) -> String {
143 match self {
144 Self::UserMessage(message) => message.to_markdown(cx),
145 Self::AssistantMessage(message) => message.to_markdown(cx),
146 Self::ToolCall(too_call) => too_call.to_markdown(cx),
147 }
148 }
149
150 pub fn diff(&self) -> Option<&Diff> {
151 if let AgentThreadEntry::ToolCall(ToolCall {
152 content: Some(ToolCallContent::Diff { diff }),
153 ..
154 }) = self
155 {
156 Some(&diff)
157 } else {
158 None
159 }
160 }
161
162 pub fn locations(&self) -> Option<&[acp::ToolCallLocation]> {
163 if let AgentThreadEntry::ToolCall(ToolCall { locations, .. }) = self {
164 Some(locations)
165 } else {
166 None
167 }
168 }
169}
170
171#[derive(Debug)]
172pub struct ToolCall {
173 pub id: acp::ToolCallId,
174 pub label: Entity<Markdown>,
175 pub kind: acp::ToolKind,
176 pub content: Option<ToolCallContent>,
177 pub status: ToolCallStatus,
178 pub locations: Vec<acp::ToolCallLocation>,
179}
180
181impl ToolCall {
182 fn to_markdown(&self, cx: &App) -> String {
183 let mut markdown = format!(
184 "**Tool Call: {}**\nStatus: {}\n\n",
185 self.label.read(cx).source(),
186 self.status
187 );
188 if let Some(content) = &self.content {
189 markdown.push_str(content.to_markdown(cx).as_str());
190 markdown.push_str("\n\n");
191 }
192 markdown
193 }
194}
195
196#[derive(Debug)]
197pub enum ToolCallStatus {
198 WaitingForConfirmation {
199 possible_grants: Vec<acp::Grant>,
200 respond_tx: oneshot::Sender<acp::GrantId>,
201 },
202 Allowed {
203 status: acp::ToolCallStatus,
204 },
205 Rejected,
206 Canceled,
207}
208
209impl Display for ToolCallStatus {
210 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
211 write!(
212 f,
213 "{}",
214 match self {
215 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
216 ToolCallStatus::Allowed { status } => match status {
217 acp::ToolCallStatus::InProgress => "In Progress",
218 acp::ToolCallStatus::Completed => "Completed",
219 acp::ToolCallStatus::Failed => "Failed",
220 },
221 ToolCallStatus::Rejected => "Rejected",
222 ToolCallStatus::Canceled => "Canceled",
223 }
224 )
225 }
226}
227
228#[derive(Debug)]
229pub enum ToolCallConfirmation {
230 Edit {
231 description: Option<Entity<Markdown>>,
232 },
233 Execute {
234 command: String,
235 root_command: String,
236 description: Option<Entity<Markdown>>,
237 },
238 Mcp {
239 server_name: String,
240 tool_name: String,
241 tool_display_name: String,
242 description: Option<Entity<Markdown>>,
243 },
244 Fetch {
245 urls: Vec<SharedString>,
246 description: Option<Entity<Markdown>>,
247 },
248 Other {
249 description: Entity<Markdown>,
250 },
251}
252
253impl ToolCallConfirmation {
254 pub fn from_acp(
255 confirmation: acp_old::ToolCallConfirmation,
256 language_registry: Arc<LanguageRegistry>,
257 cx: &mut App,
258 ) -> Self {
259 let to_md = |description: String, cx: &mut App| -> Entity<Markdown> {
260 cx.new(|cx| {
261 Markdown::new(
262 description.into(),
263 Some(language_registry.clone()),
264 None,
265 cx,
266 )
267 })
268 };
269
270 match confirmation {
271 acp_old::ToolCallConfirmation::Edit { description } => Self::Edit {
272 description: description.map(|description| to_md(description, cx)),
273 },
274 acp_old::ToolCallConfirmation::Execute {
275 command,
276 root_command,
277 description,
278 } => Self::Execute {
279 command,
280 root_command,
281 description: description.map(|description| to_md(description, cx)),
282 },
283 acp_old::ToolCallConfirmation::Mcp {
284 server_name,
285 tool_name,
286 tool_display_name,
287 description,
288 } => Self::Mcp {
289 server_name,
290 tool_name,
291 tool_display_name,
292 description: description.map(|description| to_md(description, cx)),
293 },
294 acp_old::ToolCallConfirmation::Fetch { urls, description } => Self::Fetch {
295 urls: urls.iter().map(|url| url.into()).collect(),
296 description: description.map(|description| to_md(description, cx)),
297 },
298 acp_old::ToolCallConfirmation::Other { description } => Self::Other {
299 description: to_md(description, cx),
300 },
301 }
302 }
303}
304
305#[derive(Debug, PartialEq, Clone)]
306enum ContentBlock {
307 Empty,
308 Markdown { markdown: Entity<Markdown> },
309}
310
311impl ContentBlock {
312 pub fn new(
313 block: &acp::ContentBlock,
314 language_registry: &Arc<LanguageRegistry>,
315 cx: &mut App,
316 ) -> Self {
317 let mut this = Self::Empty;
318 this.append(block, language_registry, cx);
319 this
320 }
321
322 pub fn new_combined(
323 blocks: &[acp::ContentBlock],
324 language_registry: &Arc<LanguageRegistry>,
325 cx: &mut App,
326 ) -> Self {
327 let mut this = Self::Empty;
328 for block in blocks {
329 this.append(block, language_registry, cx);
330 }
331 this
332 }
333
334 pub fn append(
335 &mut self,
336 block: &acp::ContentBlock,
337 language_registry: &Arc<LanguageRegistry>,
338 cx: &mut App,
339 ) {
340 let new_content = match block {
341 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
342 acp::ContentBlock::ResourceLink(resource_link) => {
343 if let Some(path) = resource_link.uri.strip_prefix("file://") {
344 format!("{}", MentionPath(path.as_ref()))
345 } else {
346 resource_link.uri.clone()
347 }
348 }
349 acp::ContentBlock::Image(_)
350 | acp::ContentBlock::Audio(_)
351 | acp::ContentBlock::Resource(_) => String::new(),
352 };
353
354 match self {
355 ContentBlock::Empty => {
356 *self = ContentBlock::Markdown {
357 markdown: cx.new(|cx| {
358 Markdown::new(
359 new_content.into(),
360 Some(language_registry.clone()),
361 None,
362 cx,
363 )
364 }),
365 };
366 }
367 ContentBlock::Markdown { markdown } => {
368 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
369 }
370 }
371 }
372
373 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
374 match self {
375 ContentBlock::Empty => "",
376 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
377 }
378 }
379}
380
381#[derive(Debug)]
382pub enum ToolCallContent {
383 ContentBlock { content: ContentBlock },
384 Diff { diff: Diff },
385}
386
387impl ToolCallContent {
388 pub fn from_acp(
389 content: acp::ToolCallContent,
390 language_registry: Arc<LanguageRegistry>,
391 cx: &mut App,
392 ) -> Self {
393 match content {
394 acp_old::ToolCallContent::Markdown { markdown } => Self::Markdown {
395 markdown: cx.new(|cx| Markdown::new_text(markdown.into(), cx)),
396 },
397 acp_old::ToolCallContent::Diff { diff } => Self::Diff {
398 diff: Diff::from_acp(diff, language_registry, cx),
399 },
400 }
401 }
402
403 fn to_markdown(&self, cx: &App) -> String {
404 match self {
405 Self::Markdown { markdown } => markdown.read(cx).source().to_string(),
406 Self::Diff { diff } => diff.to_markdown(cx),
407 }
408 }
409}
410
411#[derive(Debug)]
412pub struct Diff {
413 pub multibuffer: Entity<MultiBuffer>,
414 pub path: PathBuf,
415 pub new_buffer: Entity<Buffer>,
416 pub old_buffer: Entity<Buffer>,
417 _task: Task<Result<()>>,
418}
419
420impl Diff {
421 pub fn from_acp(
422 diff: acp_old::Diff,
423 language_registry: Arc<LanguageRegistry>,
424 cx: &mut App,
425 ) -> Self {
426 let acp_old::Diff {
427 path,
428 old_text,
429 new_text,
430 } = diff;
431
432 let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
433
434 let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
435 let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
436 let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
437 let old_buffer_snapshot = old_buffer.read(cx).snapshot();
438 let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
439 let diff_task = buffer_diff.update(cx, |diff, cx| {
440 diff.set_base_text(
441 old_buffer_snapshot,
442 Some(language_registry.clone()),
443 new_buffer_snapshot,
444 cx,
445 )
446 });
447
448 let task = cx.spawn({
449 let multibuffer = multibuffer.clone();
450 let path = path.clone();
451 let new_buffer = new_buffer.clone();
452 async move |cx| {
453 diff_task.await?;
454
455 multibuffer
456 .update(cx, |multibuffer, cx| {
457 let hunk_ranges = {
458 let buffer = new_buffer.read(cx);
459 let diff = buffer_diff.read(cx);
460 diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
461 .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
462 .collect::<Vec<_>>()
463 };
464
465 multibuffer.set_excerpts_for_path(
466 PathKey::for_buffer(&new_buffer, cx),
467 new_buffer.clone(),
468 hunk_ranges,
469 editor::DEFAULT_MULTIBUFFER_CONTEXT,
470 cx,
471 );
472 multibuffer.add_diff(buffer_diff.clone(), cx);
473 })
474 .log_err();
475
476 if let Some(language) = language_registry
477 .language_for_file_path(&path)
478 .await
479 .log_err()
480 {
481 new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
482 }
483
484 anyhow::Ok(())
485 }
486 });
487
488 Self {
489 multibuffer,
490 path,
491 new_buffer,
492 old_buffer,
493 _task: task,
494 }
495 }
496
497 fn to_markdown(&self, cx: &App) -> String {
498 let buffer_text = self
499 .multibuffer
500 .read(cx)
501 .all_buffers()
502 .iter()
503 .map(|buffer| buffer.read(cx).text())
504 .join("\n");
505 format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
506 }
507}
508
509#[derive(Debug, Default)]
510pub struct Plan {
511 pub entries: Vec<PlanEntry>,
512}
513
514#[derive(Debug)]
515pub struct PlanStats<'a> {
516 pub in_progress_entry: Option<&'a PlanEntry>,
517 pub pending: u32,
518 pub completed: u32,
519}
520
521impl Plan {
522 pub fn is_empty(&self) -> bool {
523 self.entries.is_empty()
524 }
525
526 pub fn stats(&self) -> PlanStats<'_> {
527 let mut stats = PlanStats {
528 in_progress_entry: None,
529 pending: 0,
530 completed: 0,
531 };
532
533 for entry in &self.entries {
534 match &entry.status {
535 acp_old::PlanEntryStatus::Pending => {
536 stats.pending += 1;
537 }
538 acp_old::PlanEntryStatus::InProgress => {
539 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
540 }
541 acp_old::PlanEntryStatus::Completed => {
542 stats.completed += 1;
543 }
544 }
545 }
546
547 stats
548 }
549}
550
551#[derive(Debug)]
552pub struct PlanEntry {
553 pub content: Entity<Markdown>,
554 pub priority: acp_old::PlanEntryPriority,
555 pub status: acp_old::PlanEntryStatus,
556}
557
558impl PlanEntry {
559 pub fn from_acp(entry: acp_old::PlanEntry, cx: &mut App) -> Self {
560 Self {
561 content: cx.new(|cx| Markdown::new_text(entry.content.into(), cx)),
562 priority: entry.priority,
563 status: entry.status,
564 }
565 }
566}
567
568pub struct AcpThread {
569 title: SharedString,
570 entries: Vec<AgentThreadEntry>,
571 plan: Plan,
572 project: Entity<Project>,
573 action_log: Entity<ActionLog>,
574 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
575 send_task: Option<Task<()>>,
576 connection: Arc<dyn AgentConnection>,
577 child_status: Option<Task<Result<()>>>,
578}
579
580pub enum AcpThreadEvent {
581 NewEntry,
582 EntryUpdated(usize),
583}
584
585impl EventEmitter<AcpThreadEvent> for AcpThread {}
586
587#[derive(PartialEq, Eq)]
588pub enum ThreadStatus {
589 Idle,
590 WaitingForToolConfirmation,
591 Generating,
592}
593
594#[derive(Debug, Clone)]
595pub enum LoadError {
596 Unsupported {
597 error_message: SharedString,
598 upgrade_message: SharedString,
599 upgrade_command: String,
600 },
601 Exited(i32),
602 Other(SharedString),
603}
604
605impl Display for LoadError {
606 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
607 match self {
608 LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
609 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
610 LoadError::Other(msg) => write!(f, "{}", msg),
611 }
612 }
613}
614
615impl Error for LoadError {}
616
617impl AcpThread {
618 pub fn new(
619 connection: impl AgentConnection + 'static,
620 title: SharedString,
621 child_status: Option<Task<Result<()>>>,
622 project: Entity<Project>,
623 cx: &mut Context<Self>,
624 ) -> Self {
625 let action_log = cx.new(|_| ActionLog::new(project.clone()));
626
627 Self {
628 action_log,
629 shared_buffers: Default::default(),
630 entries: Default::default(),
631 plan: Default::default(),
632 title,
633 project,
634 send_task: None,
635 connection: Arc::new(connection),
636 child_status,
637 }
638 }
639
640 /// Send a request to the agent and wait for a response.
641 pub fn request<R: acp_old::AgentRequest + 'static>(
642 &self,
643 params: R,
644 ) -> impl use<R> + Future<Output = Result<R::Response>> {
645 let params = params.into_any();
646 let result = self.connection.request_any(params);
647 async move {
648 let result = result.await?;
649 Ok(R::response_from_any(result)?)
650 }
651 }
652
653 pub fn action_log(&self) -> &Entity<ActionLog> {
654 &self.action_log
655 }
656
657 pub fn project(&self) -> &Entity<Project> {
658 &self.project
659 }
660
661 pub fn title(&self) -> SharedString {
662 self.title.clone()
663 }
664
665 pub fn entries(&self) -> &[AgentThreadEntry] {
666 &self.entries
667 }
668
669 pub fn status(&self) -> ThreadStatus {
670 if self.send_task.is_some() {
671 if self.waiting_for_tool_confirmation() {
672 ThreadStatus::WaitingForToolConfirmation
673 } else {
674 ThreadStatus::Generating
675 }
676 } else {
677 ThreadStatus::Idle
678 }
679 }
680
681 pub fn has_pending_edit_tool_calls(&self) -> bool {
682 for entry in self.entries.iter().rev() {
683 match entry {
684 AgentThreadEntry::UserMessage(_) => return false,
685 AgentThreadEntry::ToolCall(ToolCall {
686 status:
687 ToolCallStatus::Allowed {
688 status: acp::ToolCallStatus::InProgress,
689 ..
690 },
691 content: Some(ToolCallContent::Diff { .. }),
692 ..
693 }) => return true,
694 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
695 }
696 }
697
698 false
699 }
700
701 pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
702 self.entries.push(entry);
703 cx.emit(AcpThreadEvent::NewEntry);
704 }
705
706 pub fn push_assistant_chunk(
707 &mut self,
708 chunk: acp::ContentBlock,
709 is_thought: bool,
710 cx: &mut Context<Self>,
711 ) {
712 let language_registry = self.project.read(cx).languages().clone();
713 let entries_len = self.entries.len();
714 if let Some(last_entry) = self.entries.last_mut()
715 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
716 {
717 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
718 match (chunks.last_mut(), is_thought) {
719 (Some(AssistantMessageChunk::Message { block }), false)
720 | (Some(AssistantMessageChunk::Thought { block }), true) => {
721 block.append(&chunk, &language_registry, cx)
722 }
723 _ => {
724 let block = ContentBlock::new(&chunk, &language_registry, cx);
725 if is_thought {
726 chunks.push(AssistantMessageChunk::Thought { block })
727 } else {
728 chunks.push(AssistantMessageChunk::Message { block })
729 }
730 }
731 }
732 } else {
733 let block = ContentBlock::new(&chunk, &language_registry, cx);
734 let chunk = if is_thought {
735 AssistantMessageChunk::Thought { block }
736 } else {
737 AssistantMessageChunk::Message { block }
738 };
739
740 self.push_entry(
741 AgentThreadEntry::AssistantMessage(AssistantMessage {
742 chunks: vec![chunk],
743 }),
744 cx,
745 );
746 }
747 }
748
749 pub fn update_tool_call(
750 &mut self,
751 tool_call: acp::ToolCall,
752 cx: &mut Context<Self>,
753 ) -> Result<()> {
754 let language_registry = self.project.read(cx).languages().clone();
755
756 let new_tool_call = ToolCall {
757 id: tool_call.id,
758 label: cx.new(|cx| {
759 Markdown::new(
760 tool_call.label.into(),
761 Some(language_registry.clone()),
762 None,
763 cx,
764 )
765 }),
766 kind: tool_call.kind,
767 content: tool_call
768 .content
769 .into_iter()
770 .map(|content| ToolCallContent::from_acp(content, language_registry, cx))
771 .collect(),
772 locations: tool_call.locations,
773 status: ToolCallStatus::Allowed {
774 status: tool_call.status,
775 },
776 };
777
778 if let Some((ix, current_call)) = self.tool_call_mut(tool_call.id) {
779 match &mut current_call.status {
780 ToolCallStatus::Allowed { status } => {
781 *status = tool_call.status;
782 }
783 ToolCallStatus::WaitingForConfirmation { .. } => {
784 anyhow::bail!("Tool call hasn't been authorized yet")
785 }
786 ToolCallStatus::Rejected => {
787 anyhow::bail!("Tool call was rejected and therefore can't be updated")
788 }
789 ToolCallStatus::Canceled => {
790 current_call.status = ToolCallStatus::Allowed { status: new_status };
791 }
792 }
793
794 *current_call = new_tool_call;
795
796 let location = current_call.locations.last().cloned();
797 if let Some(location) = location {
798 self.set_project_location(location, cx)
799 }
800
801 cx.emit(AcpThreadEvent::EntryUpdated(ix));
802 } else {
803 let language_registry = self.project.read(cx).languages().clone();
804 let call = ToolCall {
805 id: tool_call.id,
806 label: cx.new(|cx| {
807 Markdown::new(
808 tool_call.label.into(),
809 Some(language_registry.clone()),
810 None,
811 cx,
812 )
813 }),
814 kind: tool_call.kind,
815 content: tool_call
816 .content
817 .into_iter()
818 .map(|content| ToolCallContent::from_acp(content, language_registry, cx))
819 .collect(),
820 locations: tool_call.locations,
821 status: ToolCallStatus::Allowed {
822 status: tool_call.status,
823 },
824 };
825
826 let location = call.locations.last().cloned();
827 if let Some(location) = location {
828 self.set_project_location(location, cx)
829 }
830
831 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
832 }
833
834 Ok(())
835 }
836
837 fn tool_call_mut(&mut self, id: acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
838 self.entries
839 .iter_mut()
840 .enumerate()
841 .rev()
842 .find_map(|(index, tool_call)| {
843 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
844 && tool_call.id == id
845 {
846 Some((index, tool_call))
847 } else {
848 None
849 }
850 })
851 }
852
853 pub fn request_tool_call_permission(
854 &mut self,
855 tool_call: acp::ToolCall,
856 possible_grants: Vec<acp::Grant>,
857 cx: &mut Context<Self>,
858 ) -> ToolCallRequest {
859 let (tx, rx) = oneshot::channel();
860
861 let status = ToolCallStatus::WaitingForConfirmation {
862 possible_grants,
863 respond_tx: tx,
864 };
865
866 let id = self.insert_tool_call(tool_call, status, cx);
867 ToolCallRequest { id, outcome: rx }
868 }
869
870 pub fn request_tool_call_confirmation(
871 &mut self,
872 tool_call_id: ToolCallId,
873 confirmation: acp_old::ToolCallConfirmation,
874 cx: &mut Context<Self>,
875 ) -> Result<ToolCallRequest> {
876 let project = self.project.read(cx).languages().clone();
877 let Some((idx, call)) = self.tool_call_mut(tool_call_id) else {
878 anyhow::bail!("Tool call not found");
879 };
880
881 let (tx, rx) = oneshot::channel();
882
883 call.status = ToolCallStatus::WaitingForConfirmation {
884 confirmation: ToolCallConfirmation::from_acp(confirmation, project, cx),
885 respond_tx: tx,
886 };
887
888 cx.emit(AcpThreadEvent::EntryUpdated(idx));
889
890 Ok(ToolCallRequest {
891 id: tool_call_id,
892 outcome: rx,
893 })
894 }
895
896 fn insert_tool_call(
897 &mut self,
898 tool_call: acp_old::PushToolCallParams,
899 status: ToolCallStatus,
900 cx: &mut Context<Self>,
901 ) -> acp_old::ToolCallId {
902 let language_registry = self.project.read(cx).languages().clone();
903 let id = acp_old::ToolCallId(self.entries.len() as u64);
904 let call = ToolCall {
905 id,
906 label: cx.new(|cx| {
907 Markdown::new(
908 tool_call.label.into(),
909 Some(language_registry.clone()),
910 None,
911 cx,
912 )
913 }),
914 icon: acp_icon_to_ui_icon(tool_call.icon),
915 content: tool_call
916 .content
917 .map(|content| ToolCallContent::from_acp(content, language_registry, cx)),
918 locations: tool_call.locations,
919 status,
920 };
921
922 let location = call.locations.last().cloned();
923 if let Some(location) = location {
924 self.set_project_location(location, cx)
925 }
926
927 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
928
929 id
930 }
931
932 pub fn authorize_tool_call(
933 &mut self,
934 id: acp::ToolCallId,
935 grant: acp::Grant,
936 cx: &mut Context<Self>,
937 ) {
938 let Some((ix, call)) = self.tool_call_mut(id) else {
939 return;
940 };
941
942 let new_status = if grant.is_allowed {
943 ToolCallStatus::Allowed {
944 status: acp::ToolCallStatus::InProgress,
945 }
946 } else {
947 ToolCallStatus::Rejected
948 };
949
950 let curr_status = mem::replace(&mut call.status, new_status);
951
952 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
953 respond_tx.send(grant.id).log_err();
954 } else if cfg!(debug_assertions) {
955 panic!("tried to authorize an already authorized tool call");
956 }
957
958 cx.emit(AcpThreadEvent::EntryUpdated(ix));
959 }
960
961 pub fn plan(&self) -> &Plan {
962 &self.plan
963 }
964
965 pub fn update_plan(&mut self, request: acp_old::UpdatePlanParams, cx: &mut Context<Self>) {
966 self.plan = Plan {
967 entries: request
968 .entries
969 .into_iter()
970 .map(|entry| PlanEntry::from_acp(entry, cx))
971 .collect(),
972 };
973
974 cx.notify();
975 }
976
977 pub fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
978 self.plan
979 .entries
980 .retain(|entry| !matches!(entry.status, acp_old::PlanEntryStatus::Completed));
981 cx.notify();
982 }
983
984 pub fn set_project_location(&self, location: acp::ToolCallLocation, cx: &mut Context<Self>) {
985 self.project.update(cx, |project, cx| {
986 let Some(path) = project.project_path_for_absolute_path(&location.path, cx) else {
987 return;
988 };
989 let buffer = project.open_buffer(path, cx);
990 cx.spawn(async move |project, cx| {
991 let buffer = buffer.await?;
992
993 project.update(cx, |project, cx| {
994 let position = if let Some(line) = location.line {
995 let snapshot = buffer.read(cx).snapshot();
996 let point = snapshot.clip_point(Point::new(line, 0), Bias::Left);
997 snapshot.anchor_before(point)
998 } else {
999 Anchor::MIN
1000 };
1001
1002 project.set_agent_location(
1003 Some(AgentLocation {
1004 buffer: buffer.downgrade(),
1005 position,
1006 }),
1007 cx,
1008 );
1009 })
1010 })
1011 .detach_and_log_err(cx);
1012 });
1013 }
1014
1015 /// Returns true if the last turn is awaiting tool authorization
1016 pub fn waiting_for_tool_confirmation(&self) -> bool {
1017 for entry in self.entries.iter().rev() {
1018 match &entry {
1019 AgentThreadEntry::ToolCall(call) => match call.status {
1020 ToolCallStatus::WaitingForConfirmation { .. } => return true,
1021 ToolCallStatus::Allowed { .. }
1022 | ToolCallStatus::Rejected
1023 | ToolCallStatus::Canceled => continue,
1024 },
1025 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1026 // Reached the beginning of the turn
1027 return false;
1028 }
1029 }
1030 }
1031 false
1032 }
1033
1034 pub fn initialize(&self) -> impl use<> + Future<Output = Result<acp_old::InitializeResponse>> {
1035 self.request(acp_old::InitializeParams {
1036 protocol_version: acp_old::ProtocolVersion::latest(),
1037 })
1038 }
1039
1040 pub fn authenticate(&self) -> impl use<> + Future<Output = Result<()>> {
1041 self.request(acp_old::AuthenticateParams)
1042 }
1043
1044 #[cfg(any(test, feature = "test-support"))]
1045 pub fn send_raw(
1046 &mut self,
1047 message: &str,
1048 cx: &mut Context<Self>,
1049 ) -> BoxFuture<'static, Result<(), acp_old::Error>> {
1050 self.send(
1051 vec![acp::ContentBlock::Text(acp::TextContent {
1052 text: message.to_string(),
1053 annotations: None,
1054 })],
1055 cx,
1056 )
1057 }
1058
1059 pub fn send(
1060 &mut self,
1061 message: Vec<acp::ContentBlock>,
1062 cx: &mut Context<Self>,
1063 ) -> BoxFuture<'static, Result<(), acp_old::Error>> {
1064 let block = ContentBlock::new_combined(&message, self.project.read(cx).languages(), cx);
1065 self.push_entry(
1066 AgentThreadEntry::UserMessage(UserMessage { content: block }),
1067 cx,
1068 );
1069
1070 let (tx, rx) = oneshot::channel();
1071 let cancel = self.cancel(cx);
1072
1073 self.send_task = Some(cx.spawn(async move |this, cx| {
1074 async {
1075 cancel.await.log_err();
1076
1077 let result = this.update(cx, |this, _| this.request(message))?.await;
1078 tx.send(result).log_err();
1079 this.update(cx, |this, _cx| this.send_task.take())?;
1080 anyhow::Ok(())
1081 }
1082 .await
1083 .log_err();
1084 }));
1085
1086 async move {
1087 match rx.await {
1088 Ok(Err(e)) => Err(e)?,
1089 _ => Ok(()),
1090 }
1091 }
1092 .boxed()
1093 }
1094
1095 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<Result<(), acp_old::Error>> {
1096 if self.send_task.take().is_some() {
1097 let request = self.request(acp_old::CancelSendMessageParams);
1098 cx.spawn(async move |this, cx| {
1099 request.await?;
1100 this.update(cx, |this, _cx| {
1101 for entry in this.entries.iter_mut() {
1102 if let AgentThreadEntry::ToolCall(call) = entry {
1103 let cancel = matches!(
1104 call.status,
1105 ToolCallStatus::WaitingForConfirmation { .. }
1106 | ToolCallStatus::Allowed {
1107 status: acp_old::ToolCallStatus::Running
1108 }
1109 );
1110
1111 if cancel {
1112 let curr_status =
1113 mem::replace(&mut call.status, ToolCallStatus::Canceled);
1114
1115 if let ToolCallStatus::WaitingForConfirmation {
1116 respond_tx, ..
1117 } = curr_status
1118 {
1119 respond_tx
1120 .send(acp_old::ToolCallConfirmationOutcome::Cancel)
1121 .ok();
1122 }
1123 }
1124 }
1125 }
1126 })?;
1127 Ok(())
1128 })
1129 } else {
1130 Task::ready(Ok(()))
1131 }
1132 }
1133
1134 pub fn read_text_file(
1135 &self,
1136 request: acp_old::ReadTextFileParams,
1137 reuse_shared_snapshot: bool,
1138 cx: &mut Context<Self>,
1139 ) -> Task<Result<String>> {
1140 let project = self.project.clone();
1141 let action_log = self.action_log.clone();
1142 cx.spawn(async move |this, cx| {
1143 let load = project.update(cx, |project, cx| {
1144 let path = project
1145 .project_path_for_absolute_path(&request.path, cx)
1146 .context("invalid path")?;
1147 anyhow::Ok(project.open_buffer(path, cx))
1148 });
1149 let buffer = load??.await?;
1150
1151 let snapshot = if reuse_shared_snapshot {
1152 this.read_with(cx, |this, _| {
1153 this.shared_buffers.get(&buffer.clone()).cloned()
1154 })
1155 .log_err()
1156 .flatten()
1157 } else {
1158 None
1159 };
1160
1161 let snapshot = if let Some(snapshot) = snapshot {
1162 snapshot
1163 } else {
1164 action_log.update(cx, |action_log, cx| {
1165 action_log.buffer_read(buffer.clone(), cx);
1166 })?;
1167 project.update(cx, |project, cx| {
1168 let position = buffer
1169 .read(cx)
1170 .snapshot()
1171 .anchor_before(Point::new(request.line.unwrap_or_default(), 0));
1172 project.set_agent_location(
1173 Some(AgentLocation {
1174 buffer: buffer.downgrade(),
1175 position,
1176 }),
1177 cx,
1178 );
1179 })?;
1180
1181 buffer.update(cx, |buffer, _| buffer.snapshot())?
1182 };
1183
1184 this.update(cx, |this, _| {
1185 let text = snapshot.text();
1186 this.shared_buffers.insert(buffer.clone(), snapshot);
1187 if request.line.is_none() && request.limit.is_none() {
1188 return Ok(text);
1189 }
1190 let limit = request.limit.unwrap_or(u32::MAX) as usize;
1191 let Some(line) = request.line else {
1192 return Ok(text.lines().take(limit).collect::<String>());
1193 };
1194
1195 let count = text.lines().count();
1196 if count < line as usize {
1197 anyhow::bail!("There are only {} lines", count);
1198 }
1199 Ok(text
1200 .lines()
1201 .skip(line as usize + 1)
1202 .take(limit)
1203 .collect::<String>())
1204 })?
1205 })
1206 }
1207
1208 pub fn write_text_file(
1209 &self,
1210 path: PathBuf,
1211 content: String,
1212 cx: &mut Context<Self>,
1213 ) -> Task<Result<()>> {
1214 let project = self.project.clone();
1215 let action_log = self.action_log.clone();
1216 cx.spawn(async move |this, cx| {
1217 let load = project.update(cx, |project, cx| {
1218 let path = project
1219 .project_path_for_absolute_path(&path, cx)
1220 .context("invalid path")?;
1221 anyhow::Ok(project.open_buffer(path, cx))
1222 });
1223 let buffer = load??.await?;
1224 let snapshot = this.update(cx, |this, cx| {
1225 this.shared_buffers
1226 .get(&buffer)
1227 .cloned()
1228 .unwrap_or_else(|| buffer.read(cx).snapshot())
1229 })?;
1230 let edits = cx
1231 .background_executor()
1232 .spawn(async move {
1233 let old_text = snapshot.text();
1234 text_diff(old_text.as_str(), &content)
1235 .into_iter()
1236 .map(|(range, replacement)| {
1237 (
1238 snapshot.anchor_after(range.start)
1239 ..snapshot.anchor_before(range.end),
1240 replacement,
1241 )
1242 })
1243 .collect::<Vec<_>>()
1244 })
1245 .await;
1246 cx.update(|cx| {
1247 project.update(cx, |project, cx| {
1248 project.set_agent_location(
1249 Some(AgentLocation {
1250 buffer: buffer.downgrade(),
1251 position: edits
1252 .last()
1253 .map(|(range, _)| range.end)
1254 .unwrap_or(Anchor::MIN),
1255 }),
1256 cx,
1257 );
1258 });
1259
1260 action_log.update(cx, |action_log, cx| {
1261 action_log.buffer_read(buffer.clone(), cx);
1262 });
1263 buffer.update(cx, |buffer, cx| {
1264 buffer.edit(edits, None, cx);
1265 });
1266 action_log.update(cx, |action_log, cx| {
1267 action_log.buffer_edited(buffer.clone(), cx);
1268 });
1269 })?;
1270 project
1271 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1272 .await
1273 })
1274 }
1275
1276 pub fn child_status(&mut self) -> Option<Task<Result<()>>> {
1277 self.child_status.take()
1278 }
1279
1280 pub fn to_markdown(&self, cx: &App) -> String {
1281 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1282 }
1283}
1284
1285#[derive(Clone)]
1286pub struct OldAcpClientDelegate {
1287 thread: WeakEntity<AcpThread>,
1288 cx: AsyncApp,
1289 // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
1290}
1291
1292impl OldAcpClientDelegate {
1293 pub fn new(thread: WeakEntity<AcpThread>, cx: AsyncApp) -> Self {
1294 Self { thread, cx }
1295 }
1296
1297 pub async fn clear_completed_plan_entries(&self) -> Result<()> {
1298 let cx = &mut self.cx.clone();
1299 cx.update(|cx| {
1300 self.thread
1301 .update(cx, |thread, cx| thread.clear_completed_plan_entries(cx))
1302 })?
1303 .context("Failed to update thread")?;
1304
1305 Ok(())
1306 }
1307
1308 pub async fn request_existing_tool_call_confirmation(
1309 &self,
1310 tool_call_id: ToolCallId,
1311 confirmation: acp_old::ToolCallConfirmation,
1312 ) -> Result<acp_old::ToolCallConfirmationOutcome> {
1313 let cx = &mut self.cx.clone();
1314 let ToolCallRequest { outcome, .. } = cx
1315 .update(|cx| {
1316 self.thread.update(cx, |thread, cx| {
1317 thread.request_tool_call_confirmation(tool_call_id, confirmation, cx)
1318 })
1319 })?
1320 .context("Failed to update thread")??;
1321
1322 Ok(outcome.await?)
1323 }
1324
1325 pub async fn read_text_file_reusing_snapshot(
1326 &self,
1327 request: acp_old::ReadTextFileParams,
1328 ) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
1329 let content = self
1330 .cx
1331 .update(|cx| {
1332 self.thread
1333 .update(cx, |thread, cx| thread.read_text_file(request, true, cx))
1334 })?
1335 .context("Failed to update thread")?
1336 .await?;
1337 Ok(acp_old::ReadTextFileResponse { content })
1338 }
1339}
1340
1341impl acp_old::Client for OldAcpClientDelegate {
1342 async fn stream_assistant_message_chunk(
1343 &self,
1344 params: acp_old::StreamAssistantMessageChunkParams,
1345 ) -> Result<(), acp_old::Error> {
1346 let cx = &mut self.cx.clone();
1347
1348 cx.update(|cx| {
1349 self.thread
1350 .update(cx, |thread, cx| {
1351 thread.push_assistant_chunk(params.chunk, cx)
1352 })
1353 .ok();
1354 })?;
1355
1356 Ok(())
1357 }
1358
1359 async fn request_tool_call_confirmation(
1360 &self,
1361 request: acp_old::RequestToolCallConfirmationParams,
1362 ) -> Result<acp_old::RequestToolCallConfirmationResponse, acp_old::Error> {
1363 let cx = &mut self.cx.clone();
1364 let ToolCallRequest { id, outcome } = cx
1365 .update(|cx| {
1366 self.thread
1367 .update(cx, |thread, cx| thread.request_new_tool_call(request, cx))
1368 })?
1369 .context("Failed to update thread")?;
1370
1371 Ok(acp_old::RequestToolCallConfirmationResponse {
1372 id,
1373 outcome: outcome.await.map_err(acp_old::Error::into_internal_error)?,
1374 })
1375 }
1376
1377 async fn push_tool_call(
1378 &self,
1379 request: acp_old::PushToolCallParams,
1380 ) -> Result<acp_old::PushToolCallResponse, acp_old::Error> {
1381 let cx = &mut self.cx.clone();
1382 let id = cx
1383 .update(|cx| {
1384 self.thread
1385 .update(cx, |thread, cx| thread.push_tool_call(request, cx))
1386 })?
1387 .context("Failed to update thread")?;
1388
1389 Ok(acp_old::PushToolCallResponse { id })
1390 }
1391
1392 async fn update_tool_call(
1393 &self,
1394 request: acp_old::UpdateToolCallParams,
1395 ) -> Result<(), acp_old::Error> {
1396 let cx = &mut self.cx.clone();
1397
1398 cx.update(|cx| {
1399 self.thread.update(cx, |thread, cx| {
1400 thread.update_tool_call(request.tool_call_id, request.status, request.content, cx)
1401 })
1402 })?
1403 .context("Failed to update thread")??;
1404
1405 Ok(())
1406 }
1407
1408 async fn update_plan(&self, request: acp_old::UpdatePlanParams) -> Result<(), acp_old::Error> {
1409 let cx = &mut self.cx.clone();
1410
1411 cx.update(|cx| {
1412 self.thread
1413 .update(cx, |thread, cx| thread.update_plan(request, cx))
1414 })?
1415 .context("Failed to update thread")?;
1416
1417 Ok(())
1418 }
1419
1420 async fn read_text_file(
1421 &self,
1422 request: acp_old::ReadTextFileParams,
1423 ) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
1424 let content = self
1425 .cx
1426 .update(|cx| {
1427 self.thread
1428 .update(cx, |thread, cx| thread.read_text_file(request, false, cx))
1429 })?
1430 .context("Failed to update thread")?
1431 .await?;
1432 Ok(acp_old::ReadTextFileResponse { content })
1433 }
1434
1435 async fn write_text_file(
1436 &self,
1437 request: acp_old::WriteTextFileParams,
1438 ) -> Result<(), acp_old::Error> {
1439 self.cx
1440 .update(|cx| {
1441 self.thread.update(cx, |thread, cx| {
1442 thread.write_text_file(request.path, request.content, cx)
1443 })
1444 })?
1445 .context("Failed to update thread")?
1446 .await?;
1447
1448 Ok(())
1449 }
1450}
1451
1452fn acp_icon_to_ui_icon(icon: acp_old::Icon) -> IconName {
1453 match icon {
1454 acp_old::Icon::FileSearch => IconName::ToolSearch,
1455 acp_old::Icon::Folder => IconName::ToolFolder,
1456 acp_old::Icon::Globe => IconName::ToolWeb,
1457 acp_old::Icon::Hammer => IconName::ToolHammer,
1458 acp_old::Icon::LightBulb => IconName::ToolBulb,
1459 acp_old::Icon::Pencil => IconName::ToolPencil,
1460 acp_old::Icon::Regex => IconName::ToolRegex,
1461 acp_old::Icon::Terminal => IconName::ToolTerminal,
1462 }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467 use super::*;
1468 use anyhow::anyhow;
1469 use async_pipe::{PipeReader, PipeWriter};
1470 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1471 use gpui::{AsyncApp, TestAppContext};
1472 use indoc::indoc;
1473 use project::FakeFs;
1474 use serde_json::json;
1475 use settings::SettingsStore;
1476 use smol::{future::BoxedLocal, stream::StreamExt as _};
1477 use std::{cell::RefCell, rc::Rc, time::Duration};
1478 use util::path;
1479
1480 fn init_test(cx: &mut TestAppContext) {
1481 env_logger::try_init().ok();
1482 cx.update(|cx| {
1483 let settings_store = SettingsStore::test(cx);
1484 cx.set_global(settings_store);
1485 Project::init_settings(cx);
1486 language::init(cx);
1487 });
1488 }
1489
1490 #[gpui::test]
1491 async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1492 init_test(cx);
1493
1494 let fs = FakeFs::new(cx.executor());
1495 let project = Project::test(fs, [], cx).await;
1496 let (thread, fake_server) = fake_acp_thread(project, cx);
1497
1498 fake_server.update(cx, |fake_server, _| {
1499 fake_server.on_user_message(move |_, server, mut cx| async move {
1500 server
1501 .update(&mut cx, |server, _| {
1502 server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1503 chunk: acp_old::AssistantMessageChunk::Thought {
1504 thought: "Thinking ".into(),
1505 },
1506 })
1507 })?
1508 .await
1509 .unwrap();
1510 server
1511 .update(&mut cx, |server, _| {
1512 server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1513 chunk: acp_old::AssistantMessageChunk::Thought {
1514 thought: "hard!".into(),
1515 },
1516 })
1517 })?
1518 .await
1519 .unwrap();
1520
1521 Ok(())
1522 })
1523 });
1524
1525 thread
1526 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1527 .await
1528 .unwrap();
1529
1530 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1531 assert_eq!(
1532 output,
1533 indoc! {r#"
1534 ## User
1535
1536 Hello from Zed!
1537
1538 ## Assistant
1539
1540 <thinking>
1541 Thinking hard!
1542 </thinking>
1543
1544 "#}
1545 );
1546 }
1547
1548 #[gpui::test]
1549 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1550 init_test(cx);
1551
1552 let fs = FakeFs::new(cx.executor());
1553 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1554 .await;
1555 let project = Project::test(fs.clone(), [], cx).await;
1556 let (thread, fake_server) = fake_acp_thread(project.clone(), cx);
1557 let (worktree, pathbuf) = project
1558 .update(cx, |project, cx| {
1559 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1560 })
1561 .await
1562 .unwrap();
1563 let buffer = project
1564 .update(cx, |project, cx| {
1565 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1566 })
1567 .await
1568 .unwrap();
1569
1570 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1571 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1572
1573 fake_server.update(cx, |fake_server, _| {
1574 fake_server.on_user_message(move |_, server, mut cx| {
1575 let read_file_tx = read_file_tx.clone();
1576 async move {
1577 let content = server
1578 .update(&mut cx, |server, _| {
1579 server.send_to_zed(acp_old::ReadTextFileParams {
1580 path: path!("/tmp/foo").into(),
1581 line: None,
1582 limit: None,
1583 })
1584 })?
1585 .await
1586 .unwrap();
1587 assert_eq!(content.content, "one\ntwo\nthree\n");
1588 read_file_tx.take().unwrap().send(()).unwrap();
1589 server
1590 .update(&mut cx, |server, _| {
1591 server.send_to_zed(acp_old::WriteTextFileParams {
1592 path: path!("/tmp/foo").into(),
1593 content: "one\ntwo\nthree\nfour\nfive\n".to_string(),
1594 })
1595 })?
1596 .await
1597 .unwrap();
1598 Ok(())
1599 }
1600 })
1601 });
1602
1603 let request = thread.update(cx, |thread, cx| {
1604 thread.send_raw("Extend the count in /tmp/foo", cx)
1605 });
1606 read_file_rx.await.ok();
1607 buffer.update(cx, |buffer, cx| {
1608 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1609 });
1610 cx.run_until_parked();
1611 assert_eq!(
1612 buffer.read_with(cx, |buffer, _| buffer.text()),
1613 "zero\none\ntwo\nthree\nfour\nfive\n"
1614 );
1615 assert_eq!(
1616 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1617 "zero\none\ntwo\nthree\nfour\nfive\n"
1618 );
1619 request.await.unwrap();
1620 }
1621
1622 #[gpui::test]
1623 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1624 init_test(cx);
1625
1626 let fs = FakeFs::new(cx.executor());
1627 let project = Project::test(fs, [], cx).await;
1628 let (thread, fake_server) = fake_acp_thread(project, cx);
1629
1630 let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1631
1632 let tool_call_id = Rc::new(RefCell::new(None));
1633 let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1634 fake_server.update(cx, |fake_server, _| {
1635 let tool_call_id = tool_call_id.clone();
1636 fake_server.on_user_message(move |_, server, mut cx| {
1637 let end_turn_rx = end_turn_rx.clone();
1638 let tool_call_id = tool_call_id.clone();
1639 async move {
1640 let tool_call_result = server
1641 .update(&mut cx, |server, _| {
1642 server.send_to_zed(acp_old::PushToolCallParams {
1643 label: "Fetch".to_string(),
1644 icon: acp_old::Icon::Globe,
1645 content: None,
1646 locations: vec![],
1647 })
1648 })?
1649 .await
1650 .unwrap();
1651 *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1652 end_turn_rx.take().unwrap().await.ok();
1653
1654 Ok(())
1655 }
1656 })
1657 });
1658
1659 let request = thread.update(cx, |thread, cx| {
1660 thread.send_raw("Fetch https://example.com", cx)
1661 });
1662
1663 run_until_first_tool_call(&thread, cx).await;
1664
1665 thread.read_with(cx, |thread, _| {
1666 assert!(matches!(
1667 thread.entries[1],
1668 AgentThreadEntry::ToolCall(ToolCall {
1669 status: ToolCallStatus::Allowed {
1670 status: acp_old::ToolCallStatus::Running,
1671 ..
1672 },
1673 ..
1674 })
1675 ));
1676 });
1677
1678 cx.run_until_parked();
1679
1680 thread
1681 .update(cx, |thread, cx| thread.cancel(cx))
1682 .await
1683 .unwrap();
1684
1685 thread.read_with(cx, |thread, _| {
1686 assert!(matches!(
1687 &thread.entries[1],
1688 AgentThreadEntry::ToolCall(ToolCall {
1689 status: ToolCallStatus::Canceled,
1690 ..
1691 })
1692 ));
1693 });
1694
1695 fake_server
1696 .update(cx, |fake_server, _| {
1697 fake_server.send_to_zed(acp_old::UpdateToolCallParams {
1698 tool_call_id: tool_call_id.borrow().unwrap(),
1699 status: acp_old::ToolCallStatus::Finished,
1700 content: None,
1701 })
1702 })
1703 .await
1704 .unwrap();
1705
1706 drop(end_turn_tx);
1707 request.await.unwrap();
1708
1709 thread.read_with(cx, |thread, _| {
1710 assert!(matches!(
1711 thread.entries[1],
1712 AgentThreadEntry::ToolCall(ToolCall {
1713 status: ToolCallStatus::Allowed {
1714 status: acp_old::ToolCallStatus::Finished,
1715 ..
1716 },
1717 ..
1718 })
1719 ));
1720 });
1721 }
1722
1723 async fn run_until_first_tool_call(
1724 thread: &Entity<AcpThread>,
1725 cx: &mut TestAppContext,
1726 ) -> usize {
1727 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1728
1729 let subscription = cx.update(|cx| {
1730 cx.subscribe(thread, move |thread, _, cx| {
1731 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1732 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1733 return tx.try_send(ix).unwrap();
1734 }
1735 }
1736 })
1737 });
1738
1739 select! {
1740 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1741 panic!("Timeout waiting for tool call")
1742 }
1743 ix = rx.next().fuse() => {
1744 drop(subscription);
1745 ix.unwrap()
1746 }
1747 }
1748 }
1749
1750 pub fn fake_acp_thread(
1751 project: Entity<Project>,
1752 cx: &mut TestAppContext,
1753 ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1754 let (stdin_tx, stdin_rx) = async_pipe::pipe();
1755 let (stdout_tx, stdout_rx) = async_pipe::pipe();
1756
1757 let thread = cx.new(|cx| {
1758 let foreground_executor = cx.foreground_executor().clone();
1759 let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent(
1760 OldAcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()),
1761 stdin_tx,
1762 stdout_rx,
1763 move |fut| {
1764 foreground_executor.spawn(fut).detach();
1765 },
1766 );
1767
1768 let io_task = cx.background_spawn({
1769 async move {
1770 io_fut.await.log_err();
1771 Ok(())
1772 }
1773 });
1774 AcpThread::new(connection, "Test".into(), Some(io_task), project, cx)
1775 });
1776 let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1777 (thread, agent)
1778 }
1779
1780 pub struct FakeAcpServer {
1781 connection: acp_old::ClientConnection,
1782
1783 _io_task: Task<()>,
1784 on_user_message: Option<
1785 Rc<
1786 dyn Fn(
1787 acp_old::SendUserMessageParams,
1788 Entity<FakeAcpServer>,
1789 AsyncApp,
1790 ) -> LocalBoxFuture<'static, Result<(), acp_old::Error>>,
1791 >,
1792 >,
1793 }
1794
1795 #[derive(Clone)]
1796 struct FakeAgent {
1797 server: Entity<FakeAcpServer>,
1798 cx: AsyncApp,
1799 }
1800
1801 impl acp_old::Agent for FakeAgent {
1802 async fn initialize(
1803 &self,
1804 params: acp_old::InitializeParams,
1805 ) -> Result<acp_old::InitializeResponse, acp_old::Error> {
1806 Ok(acp_old::InitializeResponse {
1807 protocol_version: params.protocol_version,
1808 is_authenticated: true,
1809 })
1810 }
1811
1812 async fn authenticate(&self) -> Result<(), acp_old::Error> {
1813 Ok(())
1814 }
1815
1816 async fn cancel_send_message(&self) -> Result<(), acp_old::Error> {
1817 Ok(())
1818 }
1819
1820 async fn send_user_message(
1821 &self,
1822 request: acp_old::SendUserMessageParams,
1823 ) -> Result<(), acp_old::Error> {
1824 let mut cx = self.cx.clone();
1825 let handler = self
1826 .server
1827 .update(&mut cx, |server, _| server.on_user_message.clone())
1828 .ok()
1829 .flatten();
1830 if let Some(handler) = handler {
1831 handler(request, self.server.clone(), self.cx.clone()).await
1832 } else {
1833 Err(anyhow::anyhow!("No handler for on_user_message").into())
1834 }
1835 }
1836 }
1837
1838 impl FakeAcpServer {
1839 fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1840 let agent = FakeAgent {
1841 server: cx.entity(),
1842 cx: cx.to_async(),
1843 };
1844 let foreground_executor = cx.foreground_executor().clone();
1845
1846 let (connection, io_fut) = acp_old::ClientConnection::connect_to_client(
1847 agent.clone(),
1848 stdout,
1849 stdin,
1850 move |fut| {
1851 foreground_executor.spawn(fut).detach();
1852 },
1853 );
1854 FakeAcpServer {
1855 connection: connection,
1856 on_user_message: None,
1857 _io_task: cx.background_spawn(async move {
1858 io_fut.await.log_err();
1859 }),
1860 }
1861 }
1862
1863 fn on_user_message<F>(
1864 &mut self,
1865 handler: impl for<'a> Fn(
1866 acp_old::SendUserMessageParams,
1867 Entity<FakeAcpServer>,
1868 AsyncApp,
1869 ) -> F
1870 + 'static,
1871 ) where
1872 F: Future<Output = Result<(), acp_old::Error>> + 'static,
1873 {
1874 self.on_user_message
1875 .replace(Rc::new(move |request, server, cx| {
1876 handler(request, server, cx).boxed_local()
1877 }));
1878 }
1879
1880 fn send_to_zed<T: acp_old::ClientRequest + 'static>(
1881 &self,
1882 message: T,
1883 ) -> BoxedLocal<Result<T::Response>> {
1884 self.connection
1885 .request(message)
1886 .map(|f| f.map_err(|err| anyhow!(err)))
1887 .boxed_local()
1888 }
1889 }
1890}