1mod connection;
2pub use connection::*;
3
4use agent_client_protocol as acp;
5use agentic_coding_protocol as acp_old;
6use anyhow::{Context as _, Result};
7use assistant_tool::ActionLog;
8use buffer_diff::BufferDiff;
9use editor::{Bias, MultiBuffer, PathKey};
10use futures::{FutureExt, channel::oneshot, future::BoxFuture};
11use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
12use itertools::Itertools;
13use language::{
14 Anchor, Buffer, BufferSnapshot, Capability, LanguageRegistry, OffsetRangeExt as _, Point,
15 text_diff,
16};
17use markdown::Markdown;
18use project::{AgentLocation, Project};
19use std::cell::RefCell;
20use std::collections::HashMap;
21use std::error::Error;
22use std::fmt::Formatter;
23use std::rc::Rc;
24use std::{
25 fmt::Display,
26 mem,
27 path::{Path, PathBuf},
28 sync::Arc,
29};
30use ui::App;
31use util::ResultExt;
32
33#[derive(Debug)]
34pub struct UserMessage {
35 pub content: ContentBlock,
36}
37
38impl UserMessage {
39 pub fn from_acp(
40 message: impl IntoIterator<Item = acp::ContentBlock>,
41 language_registry: Arc<LanguageRegistry>,
42 cx: &mut App,
43 ) -> Self {
44 let mut content = ContentBlock::Empty;
45 for chunk in message {
46 content.append(chunk, &language_registry, cx)
47 }
48 Self { content: content }
49 }
50
51 fn to_markdown(&self, cx: &App) -> String {
52 format!("## User\n{}\n", self.content.to_markdown(cx))
53 }
54}
55
56#[derive(Debug)]
57pub struct MentionPath<'a>(&'a Path);
58
59impl<'a> MentionPath<'a> {
60 const PREFIX: &'static str = "@file:";
61
62 pub fn new(path: &'a Path) -> Self {
63 MentionPath(path)
64 }
65
66 pub fn try_parse(url: &'a str) -> Option<Self> {
67 let path = url.strip_prefix(Self::PREFIX)?;
68 Some(MentionPath(Path::new(path)))
69 }
70
71 pub fn path(&self) -> &Path {
72 self.0
73 }
74}
75
76impl Display for MentionPath<'_> {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 write!(
79 f,
80 "[@{}]({}{})",
81 self.0.file_name().unwrap_or_default().display(),
82 Self::PREFIX,
83 self.0.display()
84 )
85 }
86}
87
88#[derive(Debug, PartialEq)]
89pub struct AssistantMessage {
90 pub chunks: Vec<AssistantMessageChunk>,
91}
92
93impl AssistantMessage {
94 pub fn to_markdown(&self, cx: &App) -> String {
95 format!(
96 "## Assistant\n\n{}\n\n",
97 self.chunks
98 .iter()
99 .map(|chunk| chunk.to_markdown(cx))
100 .join("\n\n")
101 )
102 }
103}
104
105#[derive(Debug, PartialEq)]
106pub enum AssistantMessageChunk {
107 Message { block: ContentBlock },
108 Thought { block: ContentBlock },
109}
110
111impl AssistantMessageChunk {
112 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
113 Self::Message {
114 block: ContentBlock::new(chunk.into(), language_registry, cx),
115 }
116 }
117
118 fn to_markdown(&self, cx: &App) -> String {
119 match self {
120 Self::Message { block } => block.to_markdown(cx).to_string(),
121 Self::Thought { block } => {
122 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
123 }
124 }
125 }
126}
127
128#[derive(Debug)]
129pub enum AgentThreadEntry {
130 UserMessage(UserMessage),
131 AssistantMessage(AssistantMessage),
132 ToolCall(ToolCall),
133}
134
135impl AgentThreadEntry {
136 fn to_markdown(&self, cx: &App) -> String {
137 match self {
138 Self::UserMessage(message) => message.to_markdown(cx),
139 Self::AssistantMessage(message) => message.to_markdown(cx),
140 Self::ToolCall(too_call) => too_call.to_markdown(cx),
141 }
142 }
143
144 // todo! return all diffs?
145 pub fn first_diff(&self) -> Option<&Diff> {
146 if let AgentThreadEntry::ToolCall(call) = self {
147 call.first_diff()
148 } else {
149 None
150 }
151 }
152
153 pub fn locations(&self) -> Option<&[acp::ToolCallLocation]> {
154 if let AgentThreadEntry::ToolCall(ToolCall { locations, .. }) = self {
155 Some(locations)
156 } else {
157 None
158 }
159 }
160}
161
162#[derive(Debug)]
163pub struct ToolCall {
164 pub id: acp::ToolCallId,
165 pub label: Entity<Markdown>,
166 pub kind: acp::ToolKind,
167 pub content: Vec<ToolCallContent>,
168 pub status: ToolCallStatus,
169 pub locations: Vec<acp::ToolCallLocation>,
170}
171
172impl ToolCall {
173 fn from_acp(
174 tool_call: acp::ToolCall,
175 status: ToolCallStatus,
176 language_registry: Arc<LanguageRegistry>,
177 cx: &mut App,
178 ) -> Self {
179 Self {
180 id: tool_call.id,
181 label: cx.new(|cx| {
182 Markdown::new(
183 tool_call.label.into(),
184 Some(language_registry.clone()),
185 None,
186 cx,
187 )
188 }),
189 kind: tool_call.kind,
190 content: tool_call
191 .content
192 .into_iter()
193 .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
194 .collect(),
195 locations: tool_call.locations,
196 status,
197 }
198 }
199
200 pub fn first_diff(&self) -> Option<&Diff> {
201 self.content.iter().find_map(|content| match content {
202 ToolCallContent::ContentBlock { .. } => None,
203 ToolCallContent::Diff { diff } => Some(diff),
204 })
205 }
206
207 fn to_markdown(&self, cx: &App) -> String {
208 let mut markdown = format!(
209 "**Tool Call: {}**\nStatus: {}\n\n",
210 self.label.read(cx).source(),
211 self.status
212 );
213 for content in &self.content {
214 markdown.push_str(content.to_markdown(cx).as_str());
215 markdown.push_str("\n\n");
216 }
217 markdown
218 }
219}
220
221#[derive(Debug)]
222pub enum ToolCallStatus {
223 WaitingForConfirmation {
224 options: Vec<acp::PermissionOption>,
225 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
226 },
227 Allowed {
228 status: acp::ToolCallStatus,
229 },
230 Rejected,
231 Canceled,
232}
233
234impl Display for ToolCallStatus {
235 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
236 write!(
237 f,
238 "{}",
239 match self {
240 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
241 ToolCallStatus::Allowed { status } => match status {
242 acp::ToolCallStatus::InProgress => "In Progress",
243 acp::ToolCallStatus::Completed => "Completed",
244 acp::ToolCallStatus::Failed => "Failed",
245 },
246 ToolCallStatus::Rejected => "Rejected",
247 ToolCallStatus::Canceled => "Canceled",
248 }
249 )
250 }
251}
252
253#[derive(Debug, PartialEq, Clone)]
254pub enum ContentBlock {
255 Empty,
256 Markdown { markdown: Entity<Markdown> },
257}
258
259impl ContentBlock {
260 pub fn new(
261 block: acp::ContentBlock,
262 language_registry: &Arc<LanguageRegistry>,
263 cx: &mut App,
264 ) -> Self {
265 let mut this = Self::Empty;
266 this.append(block, language_registry, cx);
267 this
268 }
269
270 pub fn new_combined(
271 blocks: impl IntoIterator<Item = acp::ContentBlock>,
272 language_registry: Arc<LanguageRegistry>,
273 cx: &mut App,
274 ) -> Self {
275 let mut this = Self::Empty;
276 for block in blocks {
277 this.append(block, &language_registry, cx);
278 }
279 this
280 }
281
282 pub fn append(
283 &mut self,
284 block: acp::ContentBlock,
285 language_registry: &Arc<LanguageRegistry>,
286 cx: &mut App,
287 ) {
288 let new_content = match block {
289 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
290 acp::ContentBlock::ResourceLink(resource_link) => {
291 if let Some(path) = resource_link.uri.strip_prefix("file://") {
292 format!("{}", MentionPath(path.as_ref()))
293 } else {
294 resource_link.uri.clone()
295 }
296 }
297 acp::ContentBlock::Image(_)
298 | acp::ContentBlock::Audio(_)
299 | acp::ContentBlock::Resource(_) => String::new(),
300 };
301
302 match self {
303 ContentBlock::Empty => {
304 *self = ContentBlock::Markdown {
305 markdown: cx.new(|cx| {
306 Markdown::new(
307 new_content.into(),
308 Some(language_registry.clone()),
309 None,
310 cx,
311 )
312 }),
313 };
314 }
315 ContentBlock::Markdown { markdown } => {
316 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
317 }
318 }
319 }
320
321 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
322 match self {
323 ContentBlock::Empty => "",
324 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
325 }
326 }
327
328 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
329 match self {
330 ContentBlock::Empty => None,
331 ContentBlock::Markdown { markdown } => Some(markdown),
332 }
333 }
334}
335
336#[derive(Debug)]
337pub enum ToolCallContent {
338 ContentBlock { content: ContentBlock },
339 Diff { diff: Diff },
340}
341
342impl ToolCallContent {
343 pub fn from_acp(
344 content: acp::ToolCallContent,
345 language_registry: Arc<LanguageRegistry>,
346 cx: &mut App,
347 ) -> Self {
348 match content {
349 acp::ToolCallContent::ContentBlock { content } => Self::ContentBlock {
350 content: ContentBlock::new(content, &language_registry, cx),
351 },
352 acp::ToolCallContent::Diff { diff } => Self::Diff {
353 diff: Diff::from_acp(diff, language_registry, cx),
354 },
355 }
356 }
357
358 pub fn to_markdown(&self, cx: &App) -> String {
359 match self {
360 Self::ContentBlock { content } => content.to_markdown(cx).to_string(),
361 Self::Diff { diff } => diff.to_markdown(cx),
362 }
363 }
364}
365
366#[derive(Debug)]
367pub struct Diff {
368 pub multibuffer: Entity<MultiBuffer>,
369 pub path: PathBuf,
370 pub new_buffer: Entity<Buffer>,
371 pub old_buffer: Entity<Buffer>,
372 _task: Task<Result<()>>,
373}
374
375impl Diff {
376 pub fn from_acp(
377 diff: acp::Diff,
378 language_registry: Arc<LanguageRegistry>,
379 cx: &mut App,
380 ) -> Self {
381 let acp::Diff {
382 path,
383 old_text,
384 new_text,
385 } = diff;
386
387 let multibuffer = cx.new(|_cx| MultiBuffer::without_headers(Capability::ReadOnly));
388
389 let new_buffer = cx.new(|cx| Buffer::local(new_text, cx));
390 let old_buffer = cx.new(|cx| Buffer::local(old_text.unwrap_or("".into()), cx));
391 let new_buffer_snapshot = new_buffer.read(cx).text_snapshot();
392 let old_buffer_snapshot = old_buffer.read(cx).snapshot();
393 let buffer_diff = cx.new(|cx| BufferDiff::new(&new_buffer_snapshot, cx));
394 let diff_task = buffer_diff.update(cx, |diff, cx| {
395 diff.set_base_text(
396 old_buffer_snapshot,
397 Some(language_registry.clone()),
398 new_buffer_snapshot,
399 cx,
400 )
401 });
402
403 let task = cx.spawn({
404 let multibuffer = multibuffer.clone();
405 let path = path.clone();
406 let new_buffer = new_buffer.clone();
407 async move |cx| {
408 diff_task.await?;
409
410 multibuffer
411 .update(cx, |multibuffer, cx| {
412 let hunk_ranges = {
413 let buffer = new_buffer.read(cx);
414 let diff = buffer_diff.read(cx);
415 diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer, cx)
416 .map(|diff_hunk| diff_hunk.buffer_range.to_point(&buffer))
417 .collect::<Vec<_>>()
418 };
419
420 multibuffer.set_excerpts_for_path(
421 PathKey::for_buffer(&new_buffer, cx),
422 new_buffer.clone(),
423 hunk_ranges,
424 editor::DEFAULT_MULTIBUFFER_CONTEXT,
425 cx,
426 );
427 multibuffer.add_diff(buffer_diff.clone(), cx);
428 })
429 .log_err();
430
431 if let Some(language) = language_registry
432 .language_for_file_path(&path)
433 .await
434 .log_err()
435 {
436 new_buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx))?;
437 }
438
439 anyhow::Ok(())
440 }
441 });
442
443 Self {
444 multibuffer,
445 path,
446 new_buffer,
447 old_buffer,
448 _task: task,
449 }
450 }
451
452 fn to_markdown(&self, cx: &App) -> String {
453 let buffer_text = self
454 .multibuffer
455 .read(cx)
456 .all_buffers()
457 .iter()
458 .map(|buffer| buffer.read(cx).text())
459 .join("\n");
460 format!("Diff: {}\n```\n{}\n```\n", self.path.display(), buffer_text)
461 }
462}
463
464#[derive(Debug, Default)]
465pub struct Plan {
466 pub entries: Vec<PlanEntry>,
467}
468
469#[derive(Debug)]
470pub struct PlanStats<'a> {
471 pub in_progress_entry: Option<&'a PlanEntry>,
472 pub pending: u32,
473 pub completed: u32,
474}
475
476impl Plan {
477 pub fn is_empty(&self) -> bool {
478 self.entries.is_empty()
479 }
480
481 pub fn stats(&self) -> PlanStats<'_> {
482 let mut stats = PlanStats {
483 in_progress_entry: None,
484 pending: 0,
485 completed: 0,
486 };
487
488 for entry in &self.entries {
489 match &entry.status {
490 acp::PlanEntryStatus::Pending => {
491 stats.pending += 1;
492 }
493 acp::PlanEntryStatus::InProgress => {
494 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
495 }
496 acp::PlanEntryStatus::Completed => {
497 stats.completed += 1;
498 }
499 }
500 }
501
502 stats
503 }
504}
505
506#[derive(Debug)]
507pub struct PlanEntry {
508 pub content: Entity<Markdown>,
509 pub priority: acp::PlanEntryPriority,
510 pub status: acp::PlanEntryStatus,
511}
512
513impl PlanEntry {
514 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
515 Self {
516 content: cx.new(|cx| Markdown::new_text(entry.content.into(), cx)),
517 priority: entry.priority,
518 status: entry.status,
519 }
520 }
521}
522
523pub struct AcpThread {
524 title: SharedString,
525 entries: Vec<AgentThreadEntry>,
526 plan: Plan,
527 project: Entity<Project>,
528 action_log: Entity<ActionLog>,
529 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
530 send_task: Option<Task<()>>,
531 connection: Rc<dyn AgentConnection>,
532 child_status: Option<Task<Result<()>>>,
533 session_id: acp::SessionId,
534}
535
536pub enum AcpThreadEvent {
537 NewEntry,
538 EntryUpdated(usize),
539}
540
541impl EventEmitter<AcpThreadEvent> for AcpThread {}
542
543#[derive(PartialEq, Eq)]
544pub enum ThreadStatus {
545 Idle,
546 WaitingForToolConfirmation,
547 Generating,
548}
549
550#[derive(Debug, Clone)]
551pub enum LoadError {
552 Unsupported {
553 error_message: SharedString,
554 upgrade_message: SharedString,
555 upgrade_command: String,
556 },
557 Exited(i32),
558 Other(SharedString),
559}
560
561impl Display for LoadError {
562 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
563 match self {
564 LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
565 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
566 LoadError::Other(msg) => write!(f, "{}", msg),
567 }
568 }
569}
570
571impl Error for LoadError {}
572
573impl AcpThread {
574 pub fn new(
575 connection: Rc<dyn AgentConnection>,
576 // todo! remove me
577 title: SharedString,
578 // todo! remove this?
579 child_status: Option<Task<Result<()>>>,
580 project: Entity<Project>,
581 session_id: acp::SessionId,
582 cx: &mut Context<Self>,
583 ) -> Self {
584 let action_log = cx.new(|_| ActionLog::new(project.clone()));
585
586 Self {
587 action_log,
588 shared_buffers: Default::default(),
589 entries: Default::default(),
590 plan: Default::default(),
591 title,
592 project,
593 send_task: None,
594 connection,
595 child_status,
596 session_id,
597 }
598 }
599
600 pub fn action_log(&self) -> &Entity<ActionLog> {
601 &self.action_log
602 }
603
604 pub fn project(&self) -> &Entity<Project> {
605 &self.project
606 }
607
608 pub fn title(&self) -> SharedString {
609 self.title.clone()
610 }
611
612 pub fn entries(&self) -> &[AgentThreadEntry] {
613 &self.entries
614 }
615
616 pub fn status(&self) -> ThreadStatus {
617 if self.send_task.is_some() {
618 if self.waiting_for_tool_confirmation() {
619 ThreadStatus::WaitingForToolConfirmation
620 } else {
621 ThreadStatus::Generating
622 }
623 } else {
624 ThreadStatus::Idle
625 }
626 }
627
628 pub fn has_pending_edit_tool_calls(&self) -> bool {
629 for entry in self.entries.iter().rev() {
630 match entry {
631 AgentThreadEntry::UserMessage(_) => return false,
632 AgentThreadEntry::ToolCall(call) if call.first_diff().is_some() => return true,
633 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
634 }
635 }
636
637 false
638 }
639
640 pub fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
641 self.entries.push(entry);
642 cx.emit(AcpThreadEvent::NewEntry);
643 }
644
645 pub fn push_assistant_chunk(
646 &mut self,
647 chunk: acp::ContentBlock,
648 is_thought: bool,
649 cx: &mut Context<Self>,
650 ) {
651 let language_registry = self.project.read(cx).languages().clone();
652 let entries_len = self.entries.len();
653 if let Some(last_entry) = self.entries.last_mut()
654 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
655 {
656 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
657 match (chunks.last_mut(), is_thought) {
658 (Some(AssistantMessageChunk::Message { block }), false)
659 | (Some(AssistantMessageChunk::Thought { block }), true) => {
660 block.append(chunk, &language_registry, cx)
661 }
662 _ => {
663 let block = ContentBlock::new(chunk, &language_registry, cx);
664 if is_thought {
665 chunks.push(AssistantMessageChunk::Thought { block })
666 } else {
667 chunks.push(AssistantMessageChunk::Message { block })
668 }
669 }
670 }
671 } else {
672 let block = ContentBlock::new(chunk, &language_registry, cx);
673 let chunk = if is_thought {
674 AssistantMessageChunk::Thought { block }
675 } else {
676 AssistantMessageChunk::Message { block }
677 };
678
679 self.push_entry(
680 AgentThreadEntry::AssistantMessage(AssistantMessage {
681 chunks: vec![chunk],
682 }),
683 cx,
684 );
685 }
686 }
687
688 pub fn update_tool_call(
689 &mut self,
690 id: acp::ToolCallId,
691 status: acp::ToolCallStatus,
692 content: Option<Vec<acp::ToolCallContent>>,
693 cx: &mut Context<Self>,
694 ) -> Result<()> {
695 let languages = self.project.read(cx).languages().clone();
696 let (ix, current_call) = self.tool_call_mut(&id).context("Tool call not found")?;
697
698 if let Some(content) = content {
699 current_call.content = content
700 .into_iter()
701 .map(|chunk| ToolCallContent::from_acp(chunk, languages.clone(), cx))
702 .collect();
703 }
704 current_call.status = ToolCallStatus::Allowed { status };
705
706 cx.emit(AcpThreadEvent::EntryUpdated(ix));
707
708 Ok(())
709 }
710
711 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
712 pub fn upsert_tool_call(&mut self, tool_call: acp::ToolCall, cx: &mut Context<Self>) {
713 let status = ToolCallStatus::Allowed {
714 status: tool_call.status,
715 };
716 self.upsert_tool_call_inner(tool_call, status, cx)
717 }
718
719 pub fn upsert_tool_call_inner(
720 &mut self,
721 tool_call: acp::ToolCall,
722 status: ToolCallStatus,
723 cx: &mut Context<Self>,
724 ) {
725 let language_registry = self.project.read(cx).languages().clone();
726 let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
727
728 let location = call.locations.last().cloned();
729
730 if let Some((ix, current_call)) = self.tool_call_mut(&call.id) {
731 *current_call = call;
732
733 cx.emit(AcpThreadEvent::EntryUpdated(ix));
734 } else {
735 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
736 }
737
738 if let Some(location) = location {
739 self.set_project_location(location, cx)
740 }
741 }
742
743 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
744 // todo! use map
745 self.entries
746 .iter_mut()
747 .enumerate()
748 .rev()
749 .find_map(|(index, tool_call)| {
750 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
751 && &tool_call.id == id
752 {
753 Some((index, tool_call))
754 } else {
755 None
756 }
757 })
758 }
759
760 pub fn request_tool_call_permission(
761 &mut self,
762 tool_call: acp::ToolCall,
763 options: Vec<acp::PermissionOption>,
764 cx: &mut Context<Self>,
765 ) -> oneshot::Receiver<acp::PermissionOptionId> {
766 let (tx, rx) = oneshot::channel();
767
768 let status = ToolCallStatus::WaitingForConfirmation {
769 options,
770 respond_tx: tx,
771 };
772
773 self.upsert_tool_call_inner(tool_call, status, cx);
774 rx
775 }
776
777 pub fn authorize_tool_call(
778 &mut self,
779 id: acp::ToolCallId,
780 option_id: acp::PermissionOptionId,
781 option_kind: acp::PermissionOptionKind,
782 cx: &mut Context<Self>,
783 ) {
784 let Some((ix, call)) = self.tool_call_mut(&id) else {
785 return;
786 };
787
788 let new_status = match option_kind {
789 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
790 ToolCallStatus::Rejected
791 }
792 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
793 ToolCallStatus::Allowed {
794 status: acp::ToolCallStatus::InProgress,
795 }
796 }
797 };
798
799 let curr_status = mem::replace(&mut call.status, new_status);
800
801 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
802 respond_tx.send(option_id).log_err();
803 } else if cfg!(debug_assertions) {
804 panic!("tried to authorize an already authorized tool call");
805 }
806
807 cx.emit(AcpThreadEvent::EntryUpdated(ix));
808 }
809
810 pub fn plan(&self) -> &Plan {
811 &self.plan
812 }
813
814 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
815 self.plan = Plan {
816 entries: request
817 .entries
818 .into_iter()
819 .map(|entry| PlanEntry::from_acp(entry, cx))
820 .collect(),
821 };
822
823 cx.notify();
824 }
825
826 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
827 self.plan
828 .entries
829 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
830 cx.notify();
831 }
832
833 pub fn set_project_location(&self, location: acp::ToolCallLocation, cx: &mut Context<Self>) {
834 self.project.update(cx, |project, cx| {
835 let Some(path) = project.project_path_for_absolute_path(&location.path, cx) else {
836 return;
837 };
838 let buffer = project.open_buffer(path, cx);
839 cx.spawn(async move |project, cx| {
840 let buffer = buffer.await?;
841
842 project.update(cx, |project, cx| {
843 let position = if let Some(line) = location.line {
844 let snapshot = buffer.read(cx).snapshot();
845 let point = snapshot.clip_point(Point::new(line, 0), Bias::Left);
846 snapshot.anchor_before(point)
847 } else {
848 Anchor::MIN
849 };
850
851 project.set_agent_location(
852 Some(AgentLocation {
853 buffer: buffer.downgrade(),
854 position,
855 }),
856 cx,
857 );
858 })
859 })
860 .detach_and_log_err(cx);
861 });
862 }
863
864 /// Returns true if the last turn is awaiting tool authorization
865 pub fn waiting_for_tool_confirmation(&self) -> bool {
866 for entry in self.entries.iter().rev() {
867 match &entry {
868 AgentThreadEntry::ToolCall(call) => match call.status {
869 ToolCallStatus::WaitingForConfirmation { .. } => return true,
870 ToolCallStatus::Allowed { .. }
871 | ToolCallStatus::Rejected
872 | ToolCallStatus::Canceled => continue,
873 },
874 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
875 // Reached the beginning of the turn
876 return false;
877 }
878 }
879 }
880 false
881 }
882
883 pub fn authenticate(&self, cx: &mut App) -> impl use<> + Future<Output = Result<()>> {
884 self.connection.authenticate(cx)
885 }
886
887 #[cfg(any(test, feature = "test-support"))]
888 pub fn send_raw(
889 &mut self,
890 message: &str,
891 cx: &mut Context<Self>,
892 ) -> BoxFuture<'static, Result<()>> {
893 self.send(
894 vec![acp::ContentBlock::Text(acp::TextContent {
895 text: message.to_string(),
896 annotations: None,
897 })],
898 cx,
899 )
900 }
901
902 pub fn send(
903 &mut self,
904 message: Vec<acp::ContentBlock>,
905 cx: &mut Context<Self>,
906 ) -> BoxFuture<'static, Result<()>> {
907 let block = ContentBlock::new_combined(
908 message.clone(),
909 self.project.read(cx).languages().clone(),
910 cx,
911 );
912 self.push_entry(
913 AgentThreadEntry::UserMessage(UserMessage { content: block }),
914 cx,
915 );
916 self.clear_completed_plan_entries(cx);
917
918 let (tx, rx) = oneshot::channel();
919 let cancel_task = self.cancel(cx);
920
921 self.send_task = Some(cx.spawn(async move |this, cx| {
922 async {
923 cancel_task.await;
924
925 let result = this
926 .update(cx, |this, cx| {
927 this.connection.prompt(
928 acp::PromptToolArguments {
929 prompt: message,
930 session_id: this.session_id.clone(),
931 },
932 cx,
933 )
934 })?
935 .await;
936 tx.send(result).log_err();
937 this.update(cx, |this, _cx| this.send_task.take())?;
938 anyhow::Ok(())
939 }
940 .await
941 .log_err();
942 }));
943
944 async move {
945 match rx.await {
946 Ok(Err(e)) => Err(e)?,
947 _ => Ok(()),
948 }
949 }
950 .boxed()
951 }
952
953 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
954 let Some(send_task) = self.send_task.take() else {
955 return Task::ready(());
956 };
957
958 for entry in self.entries.iter_mut() {
959 if let AgentThreadEntry::ToolCall(call) = entry {
960 let cancel = matches!(
961 call.status,
962 ToolCallStatus::WaitingForConfirmation { .. }
963 | ToolCallStatus::Allowed {
964 status: acp::ToolCallStatus::InProgress
965 }
966 );
967
968 if cancel {
969 call.status = ToolCallStatus::Canceled;
970 }
971 }
972 }
973
974 self.connection.cancel(cx);
975
976 // Wait for the send task to complete
977 cx.foreground_executor().spawn(send_task)
978 }
979
980 pub fn read_text_file(
981 &self,
982 path: PathBuf,
983 line: Option<u32>,
984 limit: Option<u32>,
985 reuse_shared_snapshot: bool,
986 cx: &mut Context<Self>,
987 ) -> Task<Result<String>> {
988 let project = self.project.clone();
989 let action_log = self.action_log.clone();
990 cx.spawn(async move |this, cx| {
991 let load = project.update(cx, |project, cx| {
992 let path = project
993 .project_path_for_absolute_path(&path, cx)
994 .context("invalid path")?;
995 anyhow::Ok(project.open_buffer(path, cx))
996 });
997 let buffer = load??.await?;
998
999 let snapshot = if reuse_shared_snapshot {
1000 this.read_with(cx, |this, _| {
1001 this.shared_buffers.get(&buffer.clone()).cloned()
1002 })
1003 .log_err()
1004 .flatten()
1005 } else {
1006 None
1007 };
1008
1009 let snapshot = if let Some(snapshot) = snapshot {
1010 snapshot
1011 } else {
1012 action_log.update(cx, |action_log, cx| {
1013 action_log.buffer_read(buffer.clone(), cx);
1014 })?;
1015 project.update(cx, |project, cx| {
1016 let position = buffer
1017 .read(cx)
1018 .snapshot()
1019 .anchor_before(Point::new(line.unwrap_or_default(), 0));
1020 project.set_agent_location(
1021 Some(AgentLocation {
1022 buffer: buffer.downgrade(),
1023 position,
1024 }),
1025 cx,
1026 );
1027 })?;
1028
1029 buffer.update(cx, |buffer, _| buffer.snapshot())?
1030 };
1031
1032 this.update(cx, |this, _| {
1033 let text = snapshot.text();
1034 this.shared_buffers.insert(buffer.clone(), snapshot);
1035 if line.is_none() && limit.is_none() {
1036 return Ok(text);
1037 }
1038 let limit = limit.unwrap_or(u32::MAX) as usize;
1039 let Some(line) = line else {
1040 return Ok(text.lines().take(limit).collect::<String>());
1041 };
1042
1043 let count = text.lines().count();
1044 if count < line as usize {
1045 anyhow::bail!("There are only {} lines", count);
1046 }
1047 Ok(text
1048 .lines()
1049 .skip(line as usize + 1)
1050 .take(limit)
1051 .collect::<String>())
1052 })?
1053 })
1054 }
1055
1056 pub fn write_text_file(
1057 &self,
1058 path: PathBuf,
1059 content: String,
1060 cx: &mut Context<Self>,
1061 ) -> Task<Result<()>> {
1062 let project = self.project.clone();
1063 let action_log = self.action_log.clone();
1064 cx.spawn(async move |this, cx| {
1065 let load = project.update(cx, |project, cx| {
1066 let path = project
1067 .project_path_for_absolute_path(&path, cx)
1068 .context("invalid path")?;
1069 anyhow::Ok(project.open_buffer(path, cx))
1070 });
1071 let buffer = load??.await?;
1072 let snapshot = this.update(cx, |this, cx| {
1073 this.shared_buffers
1074 .get(&buffer)
1075 .cloned()
1076 .unwrap_or_else(|| buffer.read(cx).snapshot())
1077 })?;
1078 let edits = cx
1079 .background_executor()
1080 .spawn(async move {
1081 let old_text = snapshot.text();
1082 text_diff(old_text.as_str(), &content)
1083 .into_iter()
1084 .map(|(range, replacement)| {
1085 (
1086 snapshot.anchor_after(range.start)
1087 ..snapshot.anchor_before(range.end),
1088 replacement,
1089 )
1090 })
1091 .collect::<Vec<_>>()
1092 })
1093 .await;
1094 cx.update(|cx| {
1095 project.update(cx, |project, cx| {
1096 project.set_agent_location(
1097 Some(AgentLocation {
1098 buffer: buffer.downgrade(),
1099 position: edits
1100 .last()
1101 .map(|(range, _)| range.end)
1102 .unwrap_or(Anchor::MIN),
1103 }),
1104 cx,
1105 );
1106 });
1107
1108 action_log.update(cx, |action_log, cx| {
1109 action_log.buffer_read(buffer.clone(), cx);
1110 });
1111 buffer.update(cx, |buffer, cx| {
1112 buffer.edit(edits, None, cx);
1113 });
1114 action_log.update(cx, |action_log, cx| {
1115 action_log.buffer_edited(buffer.clone(), cx);
1116 });
1117 })?;
1118 project
1119 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1120 .await
1121 })
1122 }
1123
1124 pub fn child_status(&mut self) -> Option<Task<Result<()>>> {
1125 self.child_status.take()
1126 }
1127
1128 pub fn to_markdown(&self, cx: &App) -> String {
1129 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1130 }
1131}
1132
1133#[derive(Clone)]
1134pub struct OldAcpClientDelegate {
1135 thread: Rc<RefCell<WeakEntity<AcpThread>>>,
1136 cx: AsyncApp,
1137 next_tool_call_id: Rc<RefCell<u64>>,
1138 // sent_buffer_versions: HashMap<Entity<Buffer>, HashMap<u64, BufferSnapshot>>,
1139}
1140
1141impl OldAcpClientDelegate {
1142 pub fn new(thread: Rc<RefCell<WeakEntity<AcpThread>>>, cx: AsyncApp) -> Self {
1143 Self {
1144 thread,
1145 cx,
1146 next_tool_call_id: Rc::new(RefCell::new(0)),
1147 }
1148 }
1149}
1150
1151impl acp_old::Client for OldAcpClientDelegate {
1152 async fn stream_assistant_message_chunk(
1153 &self,
1154 params: acp_old::StreamAssistantMessageChunkParams,
1155 ) -> Result<(), acp_old::Error> {
1156 let cx = &mut self.cx.clone();
1157
1158 cx.update(|cx| {
1159 self.thread
1160 .borrow()
1161 .update(cx, |thread, cx| match params.chunk {
1162 acp_old::AssistantMessageChunk::Text { text } => {
1163 thread.push_assistant_chunk(text.into(), false, cx)
1164 }
1165 acp_old::AssistantMessageChunk::Thought { thought } => {
1166 thread.push_assistant_chunk(thought.into(), true, cx)
1167 }
1168 })
1169 .ok();
1170 })?;
1171
1172 Ok(())
1173 }
1174
1175 async fn request_tool_call_confirmation(
1176 &self,
1177 request: acp_old::RequestToolCallConfirmationParams,
1178 ) -> Result<acp_old::RequestToolCallConfirmationResponse, acp_old::Error> {
1179 let cx = &mut self.cx.clone();
1180
1181 let old_acp_id = *self.next_tool_call_id.borrow() + 1;
1182 self.next_tool_call_id.replace(old_acp_id);
1183
1184 let tool_call = into_new_tool_call(
1185 acp::ToolCallId(old_acp_id.to_string().into()),
1186 request.tool_call,
1187 );
1188
1189 let mut options = match request.confirmation {
1190 acp_old::ToolCallConfirmation::Edit { .. } => vec![(
1191 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1192 acp::PermissionOptionKind::AllowAlways,
1193 "Always Allow Edits".to_string(),
1194 )],
1195 acp_old::ToolCallConfirmation::Execute { root_command, .. } => vec![(
1196 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1197 acp::PermissionOptionKind::AllowAlways,
1198 format!("Always Allow {}", root_command),
1199 )],
1200 acp_old::ToolCallConfirmation::Mcp {
1201 server_name,
1202 tool_name,
1203 ..
1204 } => vec![
1205 (
1206 acp_old::ToolCallConfirmationOutcome::AlwaysAllowMcpServer,
1207 acp::PermissionOptionKind::AllowAlways,
1208 format!("Always Allow {}", server_name),
1209 ),
1210 (
1211 acp_old::ToolCallConfirmationOutcome::AlwaysAllowTool,
1212 acp::PermissionOptionKind::AllowAlways,
1213 format!("Always Allow {}", tool_name),
1214 ),
1215 ],
1216 acp_old::ToolCallConfirmation::Fetch { .. } => vec![(
1217 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1218 acp::PermissionOptionKind::AllowAlways,
1219 "Always Allow".to_string(),
1220 )],
1221 acp_old::ToolCallConfirmation::Other { .. } => vec![(
1222 acp_old::ToolCallConfirmationOutcome::AlwaysAllow,
1223 acp::PermissionOptionKind::AllowAlways,
1224 "Always Allow".to_string(),
1225 )],
1226 };
1227
1228 options.extend([
1229 (
1230 acp_old::ToolCallConfirmationOutcome::Allow,
1231 acp::PermissionOptionKind::AllowOnce,
1232 "Allow".to_string(),
1233 ),
1234 (
1235 acp_old::ToolCallConfirmationOutcome::Reject,
1236 acp::PermissionOptionKind::RejectOnce,
1237 "Reject".to_string(),
1238 ),
1239 ]);
1240
1241 let mut outcomes = Vec::with_capacity(options.len());
1242 let mut acp_options = Vec::with_capacity(options.len());
1243
1244 for (index, (outcome, kind, label)) in options.into_iter().enumerate() {
1245 outcomes.push(outcome);
1246 acp_options.push(acp::PermissionOption {
1247 id: acp::PermissionOptionId(index.to_string().into()),
1248 label,
1249 kind,
1250 })
1251 }
1252
1253 let response = cx
1254 .update(|cx| {
1255 self.thread.borrow().update(cx, |thread, cx| {
1256 thread.request_tool_call_permission(tool_call, acp_options, cx)
1257 })
1258 })?
1259 .context("Failed to update thread")?
1260 .await;
1261
1262 let outcome = match response {
1263 Ok(option_id) => outcomes[option_id.0.parse::<usize>().unwrap_or(0)],
1264 Err(oneshot::Canceled) => acp_old::ToolCallConfirmationOutcome::Cancel,
1265 };
1266
1267 Ok(acp_old::RequestToolCallConfirmationResponse {
1268 id: acp_old::ToolCallId(old_acp_id),
1269 outcome: outcome,
1270 })
1271 }
1272
1273 async fn push_tool_call(
1274 &self,
1275 request: acp_old::PushToolCallParams,
1276 ) -> Result<acp_old::PushToolCallResponse, acp_old::Error> {
1277 let cx = &mut self.cx.clone();
1278
1279 let old_acp_id = *self.next_tool_call_id.borrow() + 1;
1280 self.next_tool_call_id.replace(old_acp_id);
1281
1282 cx.update(|cx| {
1283 self.thread.borrow().update(cx, |thread, cx| {
1284 thread.upsert_tool_call(
1285 into_new_tool_call(acp::ToolCallId(old_acp_id.to_string().into()), request),
1286 cx,
1287 )
1288 })
1289 })?
1290 .context("Failed to update thread")?;
1291
1292 Ok(acp_old::PushToolCallResponse {
1293 id: acp_old::ToolCallId(old_acp_id),
1294 })
1295 }
1296
1297 async fn update_tool_call(
1298 &self,
1299 request: acp_old::UpdateToolCallParams,
1300 ) -> Result<(), acp_old::Error> {
1301 let cx = &mut self.cx.clone();
1302
1303 cx.update(|cx| {
1304 self.thread.borrow().update(cx, |thread, cx| {
1305 let languages = thread.project.read(cx).languages().clone();
1306
1307 if let Some((ix, tool_call)) = thread
1308 .tool_call_mut(&acp::ToolCallId(request.tool_call_id.0.to_string().into()))
1309 {
1310 tool_call.status = ToolCallStatus::Allowed {
1311 status: into_new_tool_call_status(request.status),
1312 };
1313 tool_call.content = request
1314 .content
1315 .into_iter()
1316 .map(|content| {
1317 ToolCallContent::from_acp(
1318 into_new_tool_call_content(content),
1319 languages.clone(),
1320 cx,
1321 )
1322 })
1323 .collect();
1324
1325 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1326 anyhow::Ok(())
1327 } else {
1328 anyhow::bail!("Tool call not found")
1329 }
1330 })
1331 })?
1332 .context("Failed to update thread")??;
1333
1334 Ok(())
1335 }
1336
1337 async fn update_plan(&self, request: acp_old::UpdatePlanParams) -> Result<(), acp_old::Error> {
1338 let cx = &mut self.cx.clone();
1339
1340 cx.update(|cx| {
1341 self.thread.borrow().update(cx, |thread, cx| {
1342 thread.update_plan(
1343 acp::Plan {
1344 entries: request
1345 .entries
1346 .into_iter()
1347 .map(into_new_plan_entry)
1348 .collect(),
1349 },
1350 cx,
1351 )
1352 })
1353 })?
1354 .context("Failed to update thread")?;
1355
1356 Ok(())
1357 }
1358
1359 async fn read_text_file(
1360 &self,
1361 acp_old::ReadTextFileParams { path, line, limit }: acp_old::ReadTextFileParams,
1362 ) -> Result<acp_old::ReadTextFileResponse, acp_old::Error> {
1363 let content = self
1364 .cx
1365 .update(|cx| {
1366 self.thread.borrow().update(cx, |thread, cx| {
1367 thread.read_text_file(path, line, limit, false, cx)
1368 })
1369 })?
1370 .context("Failed to update thread")?
1371 .await?;
1372 Ok(acp_old::ReadTextFileResponse { content })
1373 }
1374
1375 async fn write_text_file(
1376 &self,
1377 acp_old::WriteTextFileParams { path, content }: acp_old::WriteTextFileParams,
1378 ) -> Result<(), acp_old::Error> {
1379 self.cx
1380 .update(|cx| {
1381 self.thread
1382 .borrow()
1383 .update(cx, |thread, cx| thread.write_text_file(path, content, cx))
1384 })?
1385 .context("Failed to update thread")?
1386 .await?;
1387
1388 Ok(())
1389 }
1390}
1391
1392fn into_new_tool_call(id: acp::ToolCallId, request: acp_old::PushToolCallParams) -> acp::ToolCall {
1393 acp::ToolCall {
1394 id: id,
1395 label: request.label,
1396 kind: acp_kind_from_old_icon(request.icon),
1397 status: acp::ToolCallStatus::InProgress,
1398 content: request
1399 .content
1400 .into_iter()
1401 .map(into_new_tool_call_content)
1402 .collect(),
1403 locations: request
1404 .locations
1405 .into_iter()
1406 .map(into_new_tool_call_location)
1407 .collect(),
1408 }
1409}
1410
1411fn acp_kind_from_old_icon(icon: acp_old::Icon) -> acp::ToolKind {
1412 match icon {
1413 acp_old::Icon::FileSearch => acp::ToolKind::Search,
1414 acp_old::Icon::Folder => acp::ToolKind::Search,
1415 acp_old::Icon::Globe => acp::ToolKind::Search,
1416 acp_old::Icon::Hammer => acp::ToolKind::Other,
1417 acp_old::Icon::LightBulb => acp::ToolKind::Think,
1418 acp_old::Icon::Pencil => acp::ToolKind::Edit,
1419 acp_old::Icon::Regex => acp::ToolKind::Search,
1420 acp_old::Icon::Terminal => acp::ToolKind::Execute,
1421 }
1422}
1423
1424fn into_new_tool_call_status(status: acp_old::ToolCallStatus) -> acp::ToolCallStatus {
1425 match status {
1426 acp_old::ToolCallStatus::Running => acp::ToolCallStatus::InProgress,
1427 acp_old::ToolCallStatus::Finished => acp::ToolCallStatus::Completed,
1428 acp_old::ToolCallStatus::Error => acp::ToolCallStatus::Failed,
1429 }
1430}
1431
1432fn into_new_tool_call_content(content: acp_old::ToolCallContent) -> acp::ToolCallContent {
1433 match content {
1434 acp_old::ToolCallContent::Markdown { markdown } => acp::ToolCallContent::ContentBlock {
1435 content: acp::ContentBlock::Text(acp::TextContent {
1436 annotations: None,
1437 text: markdown,
1438 }),
1439 },
1440 acp_old::ToolCallContent::Diff { diff } => acp::ToolCallContent::Diff {
1441 diff: into_new_diff(diff),
1442 },
1443 }
1444}
1445
1446fn into_new_diff(diff: acp_old::Diff) -> acp::Diff {
1447 acp::Diff {
1448 path: diff.path,
1449 old_text: diff.old_text,
1450 new_text: diff.new_text,
1451 }
1452}
1453
1454fn into_new_tool_call_location(location: acp_old::ToolCallLocation) -> acp::ToolCallLocation {
1455 acp::ToolCallLocation {
1456 path: location.path,
1457 line: location.line,
1458 }
1459}
1460
1461fn into_new_plan_entry(entry: acp_old::PlanEntry) -> acp::PlanEntry {
1462 acp::PlanEntry {
1463 content: entry.content,
1464 priority: into_new_plan_priority(entry.priority),
1465 status: into_new_plan_status(entry.status),
1466 }
1467}
1468
1469fn into_new_plan_priority(priority: acp_old::PlanEntryPriority) -> acp::PlanEntryPriority {
1470 match priority {
1471 acp_old::PlanEntryPriority::Low => acp::PlanEntryPriority::Low,
1472 acp_old::PlanEntryPriority::Medium => acp::PlanEntryPriority::Medium,
1473 acp_old::PlanEntryPriority::High => acp::PlanEntryPriority::High,
1474 }
1475}
1476
1477fn into_new_plan_status(status: acp_old::PlanEntryStatus) -> acp::PlanEntryStatus {
1478 match status {
1479 acp_old::PlanEntryStatus::Pending => acp::PlanEntryStatus::Pending,
1480 acp_old::PlanEntryStatus::InProgress => acp::PlanEntryStatus::InProgress,
1481 acp_old::PlanEntryStatus::Completed => acp::PlanEntryStatus::Completed,
1482 }
1483}
1484
1485#[cfg(test)]
1486mod tests {
1487 use super::*;
1488 use anyhow::anyhow;
1489 use async_pipe::{PipeReader, PipeWriter};
1490 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1491 use gpui::{AsyncApp, TestAppContext};
1492 use indoc::indoc;
1493 use project::FakeFs;
1494 use serde_json::json;
1495 use settings::SettingsStore;
1496 use smol::{future::BoxedLocal, stream::StreamExt as _};
1497 use std::{cell::RefCell, rc::Rc, time::Duration};
1498 use util::path;
1499
1500 fn init_test(cx: &mut TestAppContext) {
1501 env_logger::try_init().ok();
1502 cx.update(|cx| {
1503 let settings_store = SettingsStore::test(cx);
1504 cx.set_global(settings_store);
1505 Project::init_settings(cx);
1506 language::init(cx);
1507 });
1508 }
1509
1510 #[gpui::test]
1511 async fn test_thinking_concatenation(cx: &mut TestAppContext) {
1512 init_test(cx);
1513
1514 let fs = FakeFs::new(cx.executor());
1515 let project = Project::test(fs, [], cx).await;
1516 let (thread, fake_server) = fake_acp_thread(project, cx);
1517
1518 fake_server.update(cx, |fake_server, _| {
1519 fake_server.on_user_message(move |_, server, mut cx| async move {
1520 server
1521 .update(&mut cx, |server, _| {
1522 server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1523 chunk: acp_old::AssistantMessageChunk::Thought {
1524 thought: "Thinking ".into(),
1525 },
1526 })
1527 })?
1528 .await
1529 .unwrap();
1530 server
1531 .update(&mut cx, |server, _| {
1532 server.send_to_zed(acp_old::StreamAssistantMessageChunkParams {
1533 chunk: acp_old::AssistantMessageChunk::Thought {
1534 thought: "hard!".into(),
1535 },
1536 })
1537 })?
1538 .await
1539 .unwrap();
1540
1541 Ok(())
1542 })
1543 });
1544
1545 thread
1546 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1547 .await
1548 .unwrap();
1549
1550 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1551 assert_eq!(
1552 output,
1553 indoc! {r#"
1554 ## User
1555
1556 Hello from Zed!
1557
1558 ## Assistant
1559
1560 <thinking>
1561 Thinking hard!
1562 </thinking>
1563
1564 "#}
1565 );
1566 }
1567
1568 #[gpui::test]
1569 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1570 init_test(cx);
1571
1572 let fs = FakeFs::new(cx.executor());
1573 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1574 .await;
1575 let project = Project::test(fs.clone(), [], cx).await;
1576 let (thread, fake_server) = fake_acp_thread(project.clone(), cx);
1577 let (worktree, pathbuf) = project
1578 .update(cx, |project, cx| {
1579 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1580 })
1581 .await
1582 .unwrap();
1583 let buffer = project
1584 .update(cx, |project, cx| {
1585 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1586 })
1587 .await
1588 .unwrap();
1589
1590 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1591 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1592
1593 fake_server.update(cx, |fake_server, _| {
1594 fake_server.on_user_message(move |_, server, mut cx| {
1595 let read_file_tx = read_file_tx.clone();
1596 async move {
1597 let content = server
1598 .update(&mut cx, |server, _| {
1599 server.send_to_zed(acp_old::ReadTextFileParams {
1600 path: path!("/tmp/foo").into(),
1601 line: None,
1602 limit: None,
1603 })
1604 })?
1605 .await
1606 .unwrap();
1607 assert_eq!(content.content, "one\ntwo\nthree\n");
1608 read_file_tx.take().unwrap().send(()).unwrap();
1609 server
1610 .update(&mut cx, |server, _| {
1611 server.send_to_zed(acp_old::WriteTextFileParams {
1612 path: path!("/tmp/foo").into(),
1613 content: "one\ntwo\nthree\nfour\nfive\n".to_string(),
1614 })
1615 })?
1616 .await
1617 .unwrap();
1618 Ok(())
1619 }
1620 })
1621 });
1622
1623 let request = thread.update(cx, |thread, cx| {
1624 thread.send_raw("Extend the count in /tmp/foo", cx)
1625 });
1626 read_file_rx.await.ok();
1627 buffer.update(cx, |buffer, cx| {
1628 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1629 });
1630 cx.run_until_parked();
1631 assert_eq!(
1632 buffer.read_with(cx, |buffer, _| buffer.text()),
1633 "zero\none\ntwo\nthree\nfour\nfive\n"
1634 );
1635 assert_eq!(
1636 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1637 "zero\none\ntwo\nthree\nfour\nfive\n"
1638 );
1639 request.await.unwrap();
1640 }
1641
1642 #[gpui::test]
1643 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1644 init_test(cx);
1645
1646 let fs = FakeFs::new(cx.executor());
1647 let project = Project::test(fs, [], cx).await;
1648 let (thread, fake_server) = fake_acp_thread(project, cx);
1649
1650 let (end_turn_tx, end_turn_rx) = oneshot::channel::<()>();
1651
1652 let tool_call_id = Rc::new(RefCell::new(None));
1653 let end_turn_rx = Rc::new(RefCell::new(Some(end_turn_rx)));
1654 fake_server.update(cx, |fake_server, _| {
1655 let tool_call_id = tool_call_id.clone();
1656 fake_server.on_user_message(move |_, server, mut cx| {
1657 let end_turn_rx = end_turn_rx.clone();
1658 let tool_call_id = tool_call_id.clone();
1659 async move {
1660 let tool_call_result = server
1661 .update(&mut cx, |server, _| {
1662 server.send_to_zed(acp_old::PushToolCallParams {
1663 label: "Fetch".to_string(),
1664 icon: acp_old::Icon::Globe,
1665 content: None,
1666 locations: vec![],
1667 })
1668 })?
1669 .await
1670 .unwrap();
1671 *tool_call_id.clone().borrow_mut() = Some(tool_call_result.id);
1672 end_turn_rx.take().unwrap().await.ok();
1673
1674 Ok(())
1675 }
1676 })
1677 });
1678
1679 let request = thread.update(cx, |thread, cx| {
1680 thread.send_raw("Fetch https://example.com", cx)
1681 });
1682
1683 run_until_first_tool_call(&thread, cx).await;
1684
1685 thread.read_with(cx, |thread, _| {
1686 assert!(matches!(
1687 thread.entries[1],
1688 AgentThreadEntry::ToolCall(ToolCall {
1689 status: ToolCallStatus::Allowed {
1690 status: acp::ToolCallStatus::InProgress,
1691 ..
1692 },
1693 ..
1694 })
1695 ));
1696 });
1697
1698 cx.run_until_parked();
1699
1700 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1701
1702 thread.read_with(cx, |thread, _| {
1703 assert!(matches!(
1704 &thread.entries[1],
1705 AgentThreadEntry::ToolCall(ToolCall {
1706 status: ToolCallStatus::Canceled,
1707 ..
1708 })
1709 ));
1710 });
1711
1712 fake_server
1713 .update(cx, |fake_server, _| {
1714 fake_server.send_to_zed(acp_old::UpdateToolCallParams {
1715 tool_call_id: tool_call_id.borrow().unwrap(),
1716 status: acp_old::ToolCallStatus::Finished,
1717 content: None,
1718 })
1719 })
1720 .await
1721 .unwrap();
1722
1723 drop(end_turn_tx);
1724 request.await.unwrap();
1725
1726 thread.read_with(cx, |thread, _| {
1727 assert!(matches!(
1728 thread.entries[1],
1729 AgentThreadEntry::ToolCall(ToolCall {
1730 status: ToolCallStatus::Allowed {
1731 status: acp::ToolCallStatus::Completed,
1732 ..
1733 },
1734 ..
1735 })
1736 ));
1737 });
1738 }
1739
1740 async fn run_until_first_tool_call(
1741 thread: &Entity<AcpThread>,
1742 cx: &mut TestAppContext,
1743 ) -> usize {
1744 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
1745
1746 let subscription = cx.update(|cx| {
1747 cx.subscribe(thread, move |thread, _, cx| {
1748 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
1749 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
1750 return tx.try_send(ix).unwrap();
1751 }
1752 }
1753 })
1754 });
1755
1756 select! {
1757 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
1758 panic!("Timeout waiting for tool call")
1759 }
1760 ix = rx.next().fuse() => {
1761 drop(subscription);
1762 ix.unwrap()
1763 }
1764 }
1765 }
1766
1767 pub fn fake_acp_thread(
1768 project: Entity<Project>,
1769 cx: &mut TestAppContext,
1770 ) -> (Entity<AcpThread>, Entity<FakeAcpServer>) {
1771 let (stdin_tx, stdin_rx) = async_pipe::pipe();
1772 let (stdout_tx, stdout_rx) = async_pipe::pipe();
1773
1774 let thread = cx.new(|cx| {
1775 let foreground_executor = cx.foreground_executor().clone();
1776 let thread_rc = Rc::new(RefCell::new(cx.entity().downgrade()));
1777
1778 let (connection, io_fut) = acp_old::AgentConnection::connect_to_agent(
1779 OldAcpClientDelegate::new(thread_rc.clone(), cx.to_async()),
1780 stdin_tx,
1781 stdout_rx,
1782 move |fut| {
1783 foreground_executor.spawn(fut).detach();
1784 },
1785 );
1786
1787 let io_task = cx.background_spawn({
1788 async move {
1789 io_fut.await.log_err();
1790 Ok(())
1791 }
1792 });
1793 let connection = OldAcpAgentConnection {
1794 connection,
1795 child_status: io_task,
1796 thread: thread_rc,
1797 };
1798
1799 AcpThread::new(
1800 Rc::new(connection),
1801 "Test".into(),
1802 None,
1803 project,
1804 acp::SessionId("test".into()),
1805 cx,
1806 )
1807 });
1808 let agent = cx.update(|cx| cx.new(|cx| FakeAcpServer::new(stdin_rx, stdout_tx, cx)));
1809 (thread, agent)
1810 }
1811
1812 pub struct FakeAcpServer {
1813 connection: acp_old::ClientConnection,
1814
1815 _io_task: Task<()>,
1816 on_user_message: Option<
1817 Rc<
1818 dyn Fn(
1819 acp_old::SendUserMessageParams,
1820 Entity<FakeAcpServer>,
1821 AsyncApp,
1822 ) -> LocalBoxFuture<'static, Result<(), acp_old::Error>>,
1823 >,
1824 >,
1825 }
1826
1827 #[derive(Clone)]
1828 struct FakeAgent {
1829 server: Entity<FakeAcpServer>,
1830 cx: AsyncApp,
1831 }
1832
1833 impl acp_old::Agent for FakeAgent {
1834 async fn initialize(
1835 &self,
1836 params: acp_old::InitializeParams,
1837 ) -> Result<acp_old::InitializeResponse, acp_old::Error> {
1838 Ok(acp_old::InitializeResponse {
1839 protocol_version: params.protocol_version,
1840 is_authenticated: true,
1841 })
1842 }
1843
1844 async fn authenticate(&self) -> Result<(), acp_old::Error> {
1845 Ok(())
1846 }
1847
1848 async fn cancel_send_message(&self) -> Result<(), acp_old::Error> {
1849 Ok(())
1850 }
1851
1852 async fn send_user_message(
1853 &self,
1854 request: acp_old::SendUserMessageParams,
1855 ) -> Result<(), acp_old::Error> {
1856 let mut cx = self.cx.clone();
1857 let handler = self
1858 .server
1859 .update(&mut cx, |server, _| server.on_user_message.clone())
1860 .ok()
1861 .flatten();
1862 if let Some(handler) = handler {
1863 handler(request, self.server.clone(), self.cx.clone()).await
1864 } else {
1865 Err(anyhow::anyhow!("No handler for on_user_message").into())
1866 }
1867 }
1868 }
1869
1870 impl FakeAcpServer {
1871 fn new(stdin: PipeReader, stdout: PipeWriter, cx: &Context<Self>) -> Self {
1872 let agent = FakeAgent {
1873 server: cx.entity(),
1874 cx: cx.to_async(),
1875 };
1876 let foreground_executor = cx.foreground_executor().clone();
1877
1878 let (connection, io_fut) = acp_old::ClientConnection::connect_to_client(
1879 agent.clone(),
1880 stdout,
1881 stdin,
1882 move |fut| {
1883 foreground_executor.spawn(fut).detach();
1884 },
1885 );
1886 FakeAcpServer {
1887 connection: connection,
1888 on_user_message: None,
1889 _io_task: cx.background_spawn(async move {
1890 io_fut.await.log_err();
1891 }),
1892 }
1893 }
1894
1895 fn on_user_message<F>(
1896 &mut self,
1897 handler: impl for<'a> Fn(
1898 acp_old::SendUserMessageParams,
1899 Entity<FakeAcpServer>,
1900 AsyncApp,
1901 ) -> F
1902 + 'static,
1903 ) where
1904 F: Future<Output = Result<(), acp_old::Error>> + 'static,
1905 {
1906 self.on_user_message
1907 .replace(Rc::new(move |request, server, cx| {
1908 handler(request, server, cx).boxed_local()
1909 }));
1910 }
1911
1912 fn send_to_zed<T: acp_old::ClientRequest + 'static>(
1913 &self,
1914 message: T,
1915 ) -> BoxedLocal<Result<T::Response>> {
1916 self.connection
1917 .request(message)
1918 .map(|f| f.map_err(|err| anyhow!(err)))
1919 .boxed_local()
1920 }
1921 }
1922}