1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6pub use connection::*;
7pub use diff::*;
8pub use mention::*;
9pub use terminal::*;
10
11use action_log::ActionLog;
12use agent_client_protocol as acp;
13use anyhow::{Context as _, Result, anyhow};
14use editor::Bias;
15use futures::{FutureExt, channel::oneshot, future::BoxFuture};
16use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
17use itertools::Itertools;
18use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
19use markdown::Markdown;
20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
21use std::collections::HashMap;
22use std::error::Error;
23use std::fmt::{Formatter, Write};
24use std::ops::Range;
25use std::process::ExitStatus;
26use std::rc::Rc;
27use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
28use ui::App;
29use util::ResultExt;
30
31#[derive(Debug)]
32pub struct UserMessage {
33 pub id: Option<UserMessageId>,
34 pub content: ContentBlock,
35 pub chunks: Vec<acp::ContentBlock>,
36 pub checkpoint: Option<Checkpoint>,
37}
38
39#[derive(Debug)]
40pub struct Checkpoint {
41 git_checkpoint: GitStoreCheckpoint,
42 pub show: bool,
43}
44
45impl UserMessage {
46 fn to_markdown(&self, cx: &App) -> String {
47 let mut markdown = String::new();
48 if self
49 .checkpoint
50 .as_ref()
51 .map_or(false, |checkpoint| checkpoint.show)
52 {
53 writeln!(markdown, "## User (checkpoint)").unwrap();
54 } else {
55 writeln!(markdown, "## User").unwrap();
56 }
57 writeln!(markdown).unwrap();
58 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
59 writeln!(markdown).unwrap();
60 markdown
61 }
62}
63
64#[derive(Debug, PartialEq)]
65pub struct AssistantMessage {
66 pub chunks: Vec<AssistantMessageChunk>,
67}
68
69impl AssistantMessage {
70 pub fn to_markdown(&self, cx: &App) -> String {
71 format!(
72 "## Assistant\n\n{}\n\n",
73 self.chunks
74 .iter()
75 .map(|chunk| chunk.to_markdown(cx))
76 .join("\n\n")
77 )
78 }
79}
80
81#[derive(Debug, PartialEq)]
82pub enum AssistantMessageChunk {
83 Message { block: ContentBlock },
84 Thought { block: ContentBlock },
85}
86
87impl AssistantMessageChunk {
88 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
89 Self::Message {
90 block: ContentBlock::new(chunk.into(), language_registry, cx),
91 }
92 }
93
94 fn to_markdown(&self, cx: &App) -> String {
95 match self {
96 Self::Message { block } => block.to_markdown(cx).to_string(),
97 Self::Thought { block } => {
98 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
99 }
100 }
101 }
102}
103
104#[derive(Debug)]
105pub enum AgentThreadEntry {
106 UserMessage(UserMessage),
107 AssistantMessage(AssistantMessage),
108 ToolCall(ToolCall),
109}
110
111impl AgentThreadEntry {
112 pub fn to_markdown(&self, cx: &App) -> String {
113 match self {
114 Self::UserMessage(message) => message.to_markdown(cx),
115 Self::AssistantMessage(message) => message.to_markdown(cx),
116 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
117 }
118 }
119
120 pub fn user_message(&self) -> Option<&UserMessage> {
121 if let AgentThreadEntry::UserMessage(message) = self {
122 Some(message)
123 } else {
124 None
125 }
126 }
127
128 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
129 if let AgentThreadEntry::ToolCall(call) = self {
130 itertools::Either::Left(call.diffs())
131 } else {
132 itertools::Either::Right(std::iter::empty())
133 }
134 }
135
136 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
137 if let AgentThreadEntry::ToolCall(call) = self {
138 itertools::Either::Left(call.terminals())
139 } else {
140 itertools::Either::Right(std::iter::empty())
141 }
142 }
143
144 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
145 if let AgentThreadEntry::ToolCall(ToolCall {
146 locations,
147 resolved_locations,
148 ..
149 }) = self
150 {
151 Some((
152 locations.get(ix)?.clone(),
153 resolved_locations.get(ix)?.clone()?,
154 ))
155 } else {
156 None
157 }
158 }
159}
160
161#[derive(Debug)]
162pub struct ToolCall {
163 pub id: acp::ToolCallId,
164 pub label: Entity<Markdown>,
165 pub kind: acp::ToolKind,
166 pub content: Vec<ToolCallContent>,
167 pub status: ToolCallStatus,
168 pub locations: Vec<acp::ToolCallLocation>,
169 pub resolved_locations: Vec<Option<AgentLocation>>,
170 pub raw_input: Option<serde_json::Value>,
171 pub raw_output: Option<serde_json::Value>,
172}
173
174impl ToolCall {
175 fn from_acp(
176 tool_call: acp::ToolCall,
177 status: ToolCallStatus,
178 language_registry: Arc<LanguageRegistry>,
179 cx: &mut App,
180 ) -> Self {
181 Self {
182 id: tool_call.id,
183 label: cx.new(|cx| {
184 Markdown::new(
185 tool_call.title.into(),
186 Some(language_registry.clone()),
187 None,
188 cx,
189 )
190 }),
191 kind: tool_call.kind,
192 content: tool_call
193 .content
194 .into_iter()
195 .map(|content| ToolCallContent::from_acp(content, language_registry.clone(), cx))
196 .collect(),
197 locations: tool_call.locations,
198 resolved_locations: Vec::default(),
199 status,
200 raw_input: tool_call.raw_input,
201 raw_output: tool_call.raw_output,
202 }
203 }
204
205 fn update_fields(
206 &mut self,
207 fields: acp::ToolCallUpdateFields,
208 language_registry: Arc<LanguageRegistry>,
209 cx: &mut App,
210 ) {
211 let acp::ToolCallUpdateFields {
212 kind,
213 status,
214 title,
215 content,
216 locations,
217 raw_input,
218 raw_output,
219 } = fields;
220
221 if let Some(kind) = kind {
222 self.kind = kind;
223 }
224
225 if let Some(status) = status {
226 self.status = ToolCallStatus::Allowed { status };
227 }
228
229 if let Some(title) = title {
230 self.label.update(cx, |label, cx| {
231 label.replace(title, cx);
232 });
233 }
234
235 if let Some(content) = content {
236 self.content = content
237 .into_iter()
238 .map(|chunk| ToolCallContent::from_acp(chunk, language_registry.clone(), cx))
239 .collect();
240 }
241
242 if let Some(locations) = locations {
243 self.locations = locations;
244 }
245
246 if let Some(raw_input) = raw_input {
247 self.raw_input = Some(raw_input);
248 }
249
250 if let Some(raw_output) = raw_output {
251 if self.content.is_empty() {
252 if let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
253 {
254 self.content
255 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
256 markdown,
257 }));
258 }
259 }
260 self.raw_output = Some(raw_output);
261 }
262 }
263
264 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
265 self.content.iter().filter_map(|content| match content {
266 ToolCallContent::Diff(diff) => Some(diff),
267 ToolCallContent::ContentBlock(_) => None,
268 ToolCallContent::Terminal(_) => None,
269 })
270 }
271
272 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
273 self.content.iter().filter_map(|content| match content {
274 ToolCallContent::Terminal(terminal) => Some(terminal),
275 ToolCallContent::ContentBlock(_) => None,
276 ToolCallContent::Diff(_) => None,
277 })
278 }
279
280 fn to_markdown(&self, cx: &App) -> String {
281 let mut markdown = format!(
282 "**Tool Call: {}**\nStatus: {}\n\n",
283 self.label.read(cx).source(),
284 self.status
285 );
286 for content in &self.content {
287 markdown.push_str(content.to_markdown(cx).as_str());
288 markdown.push_str("\n\n");
289 }
290 markdown
291 }
292
293 async fn resolve_location(
294 location: acp::ToolCallLocation,
295 project: WeakEntity<Project>,
296 cx: &mut AsyncApp,
297 ) -> Option<AgentLocation> {
298 let buffer = project
299 .update(cx, |project, cx| {
300 if let Some(path) = project.project_path_for_absolute_path(&location.path, cx) {
301 Some(project.open_buffer(path, cx))
302 } else {
303 None
304 }
305 })
306 .ok()??;
307 let buffer = buffer.await.log_err()?;
308 let position = buffer
309 .update(cx, |buffer, _| {
310 if let Some(row) = location.line {
311 let snapshot = buffer.snapshot();
312 let column = snapshot.indent_size_for_line(row).len;
313 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
314 snapshot.anchor_before(point)
315 } else {
316 Anchor::MIN
317 }
318 })
319 .ok()?;
320
321 Some(AgentLocation {
322 buffer: buffer.downgrade(),
323 position,
324 })
325 }
326
327 fn resolve_locations(
328 &self,
329 project: Entity<Project>,
330 cx: &mut App,
331 ) -> Task<Vec<Option<AgentLocation>>> {
332 let locations = self.locations.clone();
333 project.update(cx, |_, cx| {
334 cx.spawn(async move |project, cx| {
335 let mut new_locations = Vec::new();
336 for location in locations {
337 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
338 }
339 new_locations
340 })
341 })
342 }
343}
344
345#[derive(Debug)]
346pub enum ToolCallStatus {
347 WaitingForConfirmation {
348 options: Vec<acp::PermissionOption>,
349 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
350 },
351 Allowed {
352 status: acp::ToolCallStatus,
353 },
354 Rejected,
355 Canceled,
356}
357
358impl Display for ToolCallStatus {
359 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
360 write!(
361 f,
362 "{}",
363 match self {
364 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
365 ToolCallStatus::Allowed { status } => match status {
366 acp::ToolCallStatus::Pending => "Pending",
367 acp::ToolCallStatus::InProgress => "In Progress",
368 acp::ToolCallStatus::Completed => "Completed",
369 acp::ToolCallStatus::Failed => "Failed",
370 },
371 ToolCallStatus::Rejected => "Rejected",
372 ToolCallStatus::Canceled => "Canceled",
373 }
374 )
375 }
376}
377
378#[derive(Debug, PartialEq, Clone)]
379pub enum ContentBlock {
380 Empty,
381 Markdown { markdown: Entity<Markdown> },
382 ResourceLink { resource_link: acp::ResourceLink },
383}
384
385impl ContentBlock {
386 pub fn new(
387 block: acp::ContentBlock,
388 language_registry: &Arc<LanguageRegistry>,
389 cx: &mut App,
390 ) -> Self {
391 let mut this = Self::Empty;
392 this.append(block, language_registry, cx);
393 this
394 }
395
396 pub fn new_combined(
397 blocks: impl IntoIterator<Item = acp::ContentBlock>,
398 language_registry: Arc<LanguageRegistry>,
399 cx: &mut App,
400 ) -> Self {
401 let mut this = Self::Empty;
402 for block in blocks {
403 this.append(block, &language_registry, cx);
404 }
405 this
406 }
407
408 pub fn append(
409 &mut self,
410 block: acp::ContentBlock,
411 language_registry: &Arc<LanguageRegistry>,
412 cx: &mut App,
413 ) {
414 if matches!(self, ContentBlock::Empty) {
415 if let acp::ContentBlock::ResourceLink(resource_link) = block {
416 *self = ContentBlock::ResourceLink { resource_link };
417 return;
418 }
419 }
420
421 let new_content = self.block_string_contents(block);
422
423 match self {
424 ContentBlock::Empty => {
425 *self = Self::create_markdown_block(new_content, language_registry, cx);
426 }
427 ContentBlock::Markdown { markdown } => {
428 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
429 }
430 ContentBlock::ResourceLink { resource_link } => {
431 let existing_content = Self::resource_link_md(&resource_link.uri);
432 let combined = format!("{}\n{}", existing_content, new_content);
433
434 *self = Self::create_markdown_block(combined, language_registry, cx);
435 }
436 }
437 }
438
439 fn create_markdown_block(
440 content: String,
441 language_registry: &Arc<LanguageRegistry>,
442 cx: &mut App,
443 ) -> ContentBlock {
444 ContentBlock::Markdown {
445 markdown: cx
446 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
447 }
448 }
449
450 fn block_string_contents(&self, block: acp::ContentBlock) -> String {
451 match block {
452 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
453 acp::ContentBlock::ResourceLink(resource_link) => {
454 Self::resource_link_md(&resource_link.uri)
455 }
456 acp::ContentBlock::Resource(acp::EmbeddedResource {
457 resource:
458 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
459 uri,
460 ..
461 }),
462 ..
463 }) => Self::resource_link_md(&uri),
464 acp::ContentBlock::Image(image) => Self::image_md(&image),
465 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
466 }
467 }
468
469 fn resource_link_md(uri: &str) -> String {
470 if let Some(uri) = MentionUri::parse(&uri).log_err() {
471 uri.as_link().to_string()
472 } else {
473 uri.to_string()
474 }
475 }
476
477 fn image_md(_image: &acp::ImageContent) -> String {
478 "`Image`".into()
479 }
480
481 fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
482 match self {
483 ContentBlock::Empty => "",
484 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
485 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
486 }
487 }
488
489 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
490 match self {
491 ContentBlock::Empty => None,
492 ContentBlock::Markdown { markdown } => Some(markdown),
493 ContentBlock::ResourceLink { .. } => None,
494 }
495 }
496
497 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
498 match self {
499 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
500 _ => None,
501 }
502 }
503}
504
505#[derive(Debug)]
506pub enum ToolCallContent {
507 ContentBlock(ContentBlock),
508 Diff(Entity<Diff>),
509 Terminal(Entity<Terminal>),
510}
511
512impl ToolCallContent {
513 pub fn from_acp(
514 content: acp::ToolCallContent,
515 language_registry: Arc<LanguageRegistry>,
516 cx: &mut App,
517 ) -> Self {
518 match content {
519 acp::ToolCallContent::Content { content } => {
520 Self::ContentBlock(ContentBlock::new(content, &language_registry, cx))
521 }
522 acp::ToolCallContent::Diff { diff } => {
523 Self::Diff(cx.new(|cx| Diff::from_acp(diff, language_registry, cx)))
524 }
525 }
526 }
527
528 pub fn to_markdown(&self, cx: &App) -> String {
529 match self {
530 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
531 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
532 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
533 }
534 }
535}
536
537#[derive(Debug, PartialEq)]
538pub enum ToolCallUpdate {
539 UpdateFields(acp::ToolCallUpdate),
540 UpdateDiff(ToolCallUpdateDiff),
541 UpdateTerminal(ToolCallUpdateTerminal),
542}
543
544impl ToolCallUpdate {
545 fn id(&self) -> &acp::ToolCallId {
546 match self {
547 Self::UpdateFields(update) => &update.id,
548 Self::UpdateDiff(diff) => &diff.id,
549 Self::UpdateTerminal(terminal) => &terminal.id,
550 }
551 }
552}
553
554impl From<acp::ToolCallUpdate> for ToolCallUpdate {
555 fn from(update: acp::ToolCallUpdate) -> Self {
556 Self::UpdateFields(update)
557 }
558}
559
560impl From<ToolCallUpdateDiff> for ToolCallUpdate {
561 fn from(diff: ToolCallUpdateDiff) -> Self {
562 Self::UpdateDiff(diff)
563 }
564}
565
566#[derive(Debug, PartialEq)]
567pub struct ToolCallUpdateDiff {
568 pub id: acp::ToolCallId,
569 pub diff: Entity<Diff>,
570}
571
572impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
573 fn from(terminal: ToolCallUpdateTerminal) -> Self {
574 Self::UpdateTerminal(terminal)
575 }
576}
577
578#[derive(Debug, PartialEq)]
579pub struct ToolCallUpdateTerminal {
580 pub id: acp::ToolCallId,
581 pub terminal: Entity<Terminal>,
582}
583
584#[derive(Debug, Default)]
585pub struct Plan {
586 pub entries: Vec<PlanEntry>,
587}
588
589#[derive(Debug)]
590pub struct PlanStats<'a> {
591 pub in_progress_entry: Option<&'a PlanEntry>,
592 pub pending: u32,
593 pub completed: u32,
594}
595
596impl Plan {
597 pub fn is_empty(&self) -> bool {
598 self.entries.is_empty()
599 }
600
601 pub fn stats(&self) -> PlanStats<'_> {
602 let mut stats = PlanStats {
603 in_progress_entry: None,
604 pending: 0,
605 completed: 0,
606 };
607
608 for entry in &self.entries {
609 match &entry.status {
610 acp::PlanEntryStatus::Pending => {
611 stats.pending += 1;
612 }
613 acp::PlanEntryStatus::InProgress => {
614 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
615 }
616 acp::PlanEntryStatus::Completed => {
617 stats.completed += 1;
618 }
619 }
620 }
621
622 stats
623 }
624}
625
626#[derive(Debug)]
627pub struct PlanEntry {
628 pub content: Entity<Markdown>,
629 pub priority: acp::PlanEntryPriority,
630 pub status: acp::PlanEntryStatus,
631}
632
633impl PlanEntry {
634 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
635 Self {
636 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
637 priority: entry.priority,
638 status: entry.status,
639 }
640 }
641}
642
643pub struct AcpThread {
644 title: SharedString,
645 entries: Vec<AgentThreadEntry>,
646 plan: Plan,
647 project: Entity<Project>,
648 action_log: Entity<ActionLog>,
649 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
650 send_task: Option<Task<()>>,
651 connection: Rc<dyn AgentConnection>,
652 session_id: acp::SessionId,
653}
654
655pub enum AcpThreadEvent {
656 NewEntry,
657 EntryUpdated(usize),
658 EntriesRemoved(Range<usize>),
659 ToolAuthorizationRequired,
660 Stopped,
661 Error,
662 ServerExited(ExitStatus),
663}
664
665impl EventEmitter<AcpThreadEvent> for AcpThread {}
666
667#[derive(PartialEq, Eq)]
668pub enum ThreadStatus {
669 Idle,
670 WaitingForToolConfirmation,
671 Generating,
672}
673
674#[derive(Debug, Clone)]
675pub enum LoadError {
676 Unsupported {
677 error_message: SharedString,
678 upgrade_message: SharedString,
679 upgrade_command: String,
680 },
681 Exited(i32),
682 Other(SharedString),
683}
684
685impl Display for LoadError {
686 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
687 match self {
688 LoadError::Unsupported { error_message, .. } => write!(f, "{}", error_message),
689 LoadError::Exited(status) => write!(f, "Server exited with status {}", status),
690 LoadError::Other(msg) => write!(f, "{}", msg),
691 }
692 }
693}
694
695impl Error for LoadError {}
696
697impl AcpThread {
698 pub fn new(
699 title: impl Into<SharedString>,
700 connection: Rc<dyn AgentConnection>,
701 project: Entity<Project>,
702 session_id: acp::SessionId,
703 cx: &mut Context<Self>,
704 ) -> Self {
705 let action_log = cx.new(|_| ActionLog::new(project.clone()));
706
707 Self {
708 action_log,
709 shared_buffers: Default::default(),
710 entries: Default::default(),
711 plan: Default::default(),
712 title: title.into(),
713 project,
714 send_task: None,
715 connection,
716 session_id,
717 }
718 }
719
720 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
721 &self.connection
722 }
723
724 pub fn action_log(&self) -> &Entity<ActionLog> {
725 &self.action_log
726 }
727
728 pub fn project(&self) -> &Entity<Project> {
729 &self.project
730 }
731
732 pub fn title(&self) -> SharedString {
733 self.title.clone()
734 }
735
736 pub fn entries(&self) -> &[AgentThreadEntry] {
737 &self.entries
738 }
739
740 pub fn session_id(&self) -> &acp::SessionId {
741 &self.session_id
742 }
743
744 pub fn status(&self) -> ThreadStatus {
745 if self.send_task.is_some() {
746 if self.waiting_for_tool_confirmation() {
747 ThreadStatus::WaitingForToolConfirmation
748 } else {
749 ThreadStatus::Generating
750 }
751 } else {
752 ThreadStatus::Idle
753 }
754 }
755
756 pub fn has_pending_edit_tool_calls(&self) -> bool {
757 for entry in self.entries.iter().rev() {
758 match entry {
759 AgentThreadEntry::UserMessage(_) => return false,
760 AgentThreadEntry::ToolCall(
761 call @ ToolCall {
762 status:
763 ToolCallStatus::Allowed {
764 status:
765 acp::ToolCallStatus::InProgress | acp::ToolCallStatus::Pending,
766 },
767 ..
768 },
769 ) if call.diffs().next().is_some() => {
770 return true;
771 }
772 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
773 }
774 }
775
776 false
777 }
778
779 pub fn used_tools_since_last_user_message(&self) -> bool {
780 for entry in self.entries.iter().rev() {
781 match entry {
782 AgentThreadEntry::UserMessage(..) => return false,
783 AgentThreadEntry::AssistantMessage(..) => continue,
784 AgentThreadEntry::ToolCall(..) => return true,
785 }
786 }
787
788 false
789 }
790
791 pub fn handle_session_update(
792 &mut self,
793 update: acp::SessionUpdate,
794 cx: &mut Context<Self>,
795 ) -> Result<()> {
796 match update {
797 acp::SessionUpdate::UserMessageChunk { content } => {
798 self.push_user_content_block(None, content, cx);
799 }
800 acp::SessionUpdate::AgentMessageChunk { content } => {
801 self.push_assistant_content_block(content, false, cx);
802 }
803 acp::SessionUpdate::AgentThoughtChunk { content } => {
804 self.push_assistant_content_block(content, true, cx);
805 }
806 acp::SessionUpdate::ToolCall(tool_call) => {
807 self.upsert_tool_call(tool_call, cx);
808 }
809 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
810 self.update_tool_call(tool_call_update, cx)?;
811 }
812 acp::SessionUpdate::Plan(plan) => {
813 self.update_plan(plan, cx);
814 }
815 }
816 Ok(())
817 }
818
819 pub fn push_user_content_block(
820 &mut self,
821 message_id: Option<UserMessageId>,
822 chunk: acp::ContentBlock,
823 cx: &mut Context<Self>,
824 ) {
825 let language_registry = self.project.read(cx).languages().clone();
826 let entries_len = self.entries.len();
827
828 if let Some(last_entry) = self.entries.last_mut()
829 && let AgentThreadEntry::UserMessage(UserMessage {
830 id,
831 content,
832 chunks,
833 ..
834 }) = last_entry
835 {
836 *id = message_id.or(id.take());
837 content.append(chunk.clone(), &language_registry, cx);
838 chunks.push(chunk);
839 let idx = entries_len - 1;
840 cx.emit(AcpThreadEvent::EntryUpdated(idx));
841 } else {
842 let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
843 self.push_entry(
844 AgentThreadEntry::UserMessage(UserMessage {
845 id: message_id,
846 content,
847 chunks: vec![chunk],
848 checkpoint: None,
849 }),
850 cx,
851 );
852 }
853 }
854
855 pub fn push_assistant_content_block(
856 &mut self,
857 chunk: acp::ContentBlock,
858 is_thought: bool,
859 cx: &mut Context<Self>,
860 ) {
861 let language_registry = self.project.read(cx).languages().clone();
862 let entries_len = self.entries.len();
863 if let Some(last_entry) = self.entries.last_mut()
864 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
865 {
866 let idx = entries_len - 1;
867 cx.emit(AcpThreadEvent::EntryUpdated(idx));
868 match (chunks.last_mut(), is_thought) {
869 (Some(AssistantMessageChunk::Message { block }), false)
870 | (Some(AssistantMessageChunk::Thought { block }), true) => {
871 block.append(chunk, &language_registry, cx)
872 }
873 _ => {
874 let block = ContentBlock::new(chunk, &language_registry, cx);
875 if is_thought {
876 chunks.push(AssistantMessageChunk::Thought { block })
877 } else {
878 chunks.push(AssistantMessageChunk::Message { block })
879 }
880 }
881 }
882 } else {
883 let block = ContentBlock::new(chunk, &language_registry, cx);
884 let chunk = if is_thought {
885 AssistantMessageChunk::Thought { block }
886 } else {
887 AssistantMessageChunk::Message { block }
888 };
889
890 self.push_entry(
891 AgentThreadEntry::AssistantMessage(AssistantMessage {
892 chunks: vec![chunk],
893 }),
894 cx,
895 );
896 }
897 }
898
899 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
900 self.entries.push(entry);
901 cx.emit(AcpThreadEvent::NewEntry);
902 }
903
904 pub fn update_tool_call(
905 &mut self,
906 update: impl Into<ToolCallUpdate>,
907 cx: &mut Context<Self>,
908 ) -> Result<()> {
909 let update = update.into();
910 let languages = self.project.read(cx).languages().clone();
911
912 let (ix, current_call) = self
913 .tool_call_mut(update.id())
914 .context("Tool call not found")?;
915 match update {
916 ToolCallUpdate::UpdateFields(update) => {
917 let location_updated = update.fields.locations.is_some();
918 current_call.update_fields(update.fields, languages, cx);
919 if location_updated {
920 self.resolve_locations(update.id.clone(), cx);
921 }
922 }
923 ToolCallUpdate::UpdateDiff(update) => {
924 current_call.content.clear();
925 current_call
926 .content
927 .push(ToolCallContent::Diff(update.diff));
928 }
929 ToolCallUpdate::UpdateTerminal(update) => {
930 current_call.content.clear();
931 current_call
932 .content
933 .push(ToolCallContent::Terminal(update.terminal));
934 }
935 }
936
937 cx.emit(AcpThreadEvent::EntryUpdated(ix));
938
939 Ok(())
940 }
941
942 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
943 pub fn upsert_tool_call(&mut self, tool_call: acp::ToolCall, cx: &mut Context<Self>) {
944 let status = ToolCallStatus::Allowed {
945 status: tool_call.status,
946 };
947 self.upsert_tool_call_inner(tool_call, status, cx)
948 }
949
950 pub fn upsert_tool_call_inner(
951 &mut self,
952 tool_call: acp::ToolCall,
953 status: ToolCallStatus,
954 cx: &mut Context<Self>,
955 ) {
956 let language_registry = self.project.read(cx).languages().clone();
957 let call = ToolCall::from_acp(tool_call, status, language_registry, cx);
958 let id = call.id.clone();
959
960 if let Some((ix, current_call)) = self.tool_call_mut(&call.id) {
961 *current_call = call;
962
963 cx.emit(AcpThreadEvent::EntryUpdated(ix));
964 } else {
965 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
966 };
967
968 self.resolve_locations(id, cx);
969 }
970
971 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
972 // The tool call we are looking for is typically the last one, or very close to the end.
973 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
974 self.entries
975 .iter_mut()
976 .enumerate()
977 .rev()
978 .find_map(|(index, tool_call)| {
979 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
980 && &tool_call.id == id
981 {
982 Some((index, tool_call))
983 } else {
984 None
985 }
986 })
987 }
988
989 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
990 let project = self.project.clone();
991 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
992 return;
993 };
994 let task = tool_call.resolve_locations(project, cx);
995 cx.spawn(async move |this, cx| {
996 let resolved_locations = task.await;
997 this.update(cx, |this, cx| {
998 let project = this.project.clone();
999 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1000 return;
1001 };
1002 if let Some(Some(location)) = resolved_locations.last() {
1003 project.update(cx, |project, cx| {
1004 if let Some(agent_location) = project.agent_location() {
1005 let should_ignore = agent_location.buffer == location.buffer
1006 && location
1007 .buffer
1008 .update(cx, |buffer, _| {
1009 let snapshot = buffer.snapshot();
1010 let old_position =
1011 agent_location.position.to_point(&snapshot);
1012 let new_position = location.position.to_point(&snapshot);
1013 // ignore this so that when we get updates from the edit tool
1014 // the position doesn't reset to the startof line
1015 old_position.row == new_position.row
1016 && old_position.column > new_position.column
1017 })
1018 .ok()
1019 .unwrap_or_default();
1020 if !should_ignore {
1021 project.set_agent_location(Some(location.clone()), cx);
1022 }
1023 }
1024 });
1025 }
1026 if tool_call.resolved_locations != resolved_locations {
1027 tool_call.resolved_locations = resolved_locations;
1028 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1029 }
1030 })
1031 })
1032 .detach();
1033 }
1034
1035 pub fn request_tool_call_authorization(
1036 &mut self,
1037 tool_call: acp::ToolCall,
1038 options: Vec<acp::PermissionOption>,
1039 cx: &mut Context<Self>,
1040 ) -> oneshot::Receiver<acp::PermissionOptionId> {
1041 let (tx, rx) = oneshot::channel();
1042
1043 let status = ToolCallStatus::WaitingForConfirmation {
1044 options,
1045 respond_tx: tx,
1046 };
1047
1048 self.upsert_tool_call_inner(tool_call, status, cx);
1049 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1050 rx
1051 }
1052
1053 pub fn authorize_tool_call(
1054 &mut self,
1055 id: acp::ToolCallId,
1056 option_id: acp::PermissionOptionId,
1057 option_kind: acp::PermissionOptionKind,
1058 cx: &mut Context<Self>,
1059 ) {
1060 let Some((ix, call)) = self.tool_call_mut(&id) else {
1061 return;
1062 };
1063
1064 let new_status = match option_kind {
1065 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1066 ToolCallStatus::Rejected
1067 }
1068 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1069 ToolCallStatus::Allowed {
1070 status: acp::ToolCallStatus::InProgress,
1071 }
1072 }
1073 };
1074
1075 let curr_status = mem::replace(&mut call.status, new_status);
1076
1077 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1078 respond_tx.send(option_id).log_err();
1079 } else if cfg!(debug_assertions) {
1080 panic!("tried to authorize an already authorized tool call");
1081 }
1082
1083 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1084 }
1085
1086 /// Returns true if the last turn is awaiting tool authorization
1087 pub fn waiting_for_tool_confirmation(&self) -> bool {
1088 for entry in self.entries.iter().rev() {
1089 match &entry {
1090 AgentThreadEntry::ToolCall(call) => match call.status {
1091 ToolCallStatus::WaitingForConfirmation { .. } => return true,
1092 ToolCallStatus::Allowed { .. }
1093 | ToolCallStatus::Rejected
1094 | ToolCallStatus::Canceled => continue,
1095 },
1096 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1097 // Reached the beginning of the turn
1098 return false;
1099 }
1100 }
1101 }
1102 false
1103 }
1104
1105 pub fn plan(&self) -> &Plan {
1106 &self.plan
1107 }
1108
1109 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1110 let new_entries_len = request.entries.len();
1111 let mut new_entries = request.entries.into_iter();
1112
1113 // Reuse existing markdown to prevent flickering
1114 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1115 let PlanEntry {
1116 content,
1117 priority,
1118 status,
1119 } = old;
1120 content.update(cx, |old, cx| {
1121 old.replace(new.content, cx);
1122 });
1123 *priority = new.priority;
1124 *status = new.status;
1125 }
1126 for new in new_entries {
1127 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1128 }
1129 self.plan.entries.truncate(new_entries_len);
1130
1131 cx.notify();
1132 }
1133
1134 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1135 self.plan
1136 .entries
1137 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1138 cx.notify();
1139 }
1140
1141 #[cfg(any(test, feature = "test-support"))]
1142 pub fn send_raw(
1143 &mut self,
1144 message: &str,
1145 cx: &mut Context<Self>,
1146 ) -> BoxFuture<'static, Result<()>> {
1147 self.send(
1148 vec![acp::ContentBlock::Text(acp::TextContent {
1149 text: message.to_string(),
1150 annotations: None,
1151 })],
1152 cx,
1153 )
1154 }
1155
1156 pub fn send(
1157 &mut self,
1158 message: Vec<acp::ContentBlock>,
1159 cx: &mut Context<Self>,
1160 ) -> BoxFuture<'static, Result<()>> {
1161 let block = ContentBlock::new_combined(
1162 message.clone(),
1163 self.project.read(cx).languages().clone(),
1164 cx,
1165 );
1166 let request = acp::PromptRequest {
1167 prompt: message.clone(),
1168 session_id: self.session_id.clone(),
1169 };
1170 let git_store = self.project.read(cx).git_store().clone();
1171
1172 let message_id = if self
1173 .connection
1174 .session_editor(&self.session_id, cx)
1175 .is_some()
1176 {
1177 Some(UserMessageId::new())
1178 } else {
1179 None
1180 };
1181 self.push_entry(
1182 AgentThreadEntry::UserMessage(UserMessage {
1183 id: message_id.clone(),
1184 content: block,
1185 chunks: message,
1186 checkpoint: None,
1187 }),
1188 cx,
1189 );
1190
1191 self.run_turn(cx, async move |this, cx| {
1192 let old_checkpoint = git_store
1193 .update(cx, |git, cx| git.checkpoint(cx))?
1194 .await
1195 .context("failed to get old checkpoint")
1196 .log_err();
1197 this.update(cx, |this, cx| {
1198 if let Some((_ix, message)) = this.last_user_message() {
1199 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1200 git_checkpoint,
1201 show: false,
1202 });
1203 }
1204 this.connection.prompt(message_id, request, cx)
1205 })?
1206 .await
1207 })
1208 }
1209
1210 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1211 self.run_turn(cx, async move |this, cx| {
1212 this.update(cx, |this, cx| {
1213 this.connection
1214 .resume(&this.session_id, cx)
1215 .map(|resume| resume.run(cx))
1216 })?
1217 .context("resuming a session is not supported")?
1218 .await
1219 })
1220 }
1221
1222 fn run_turn(
1223 &mut self,
1224 cx: &mut Context<Self>,
1225 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1226 ) -> BoxFuture<'static, Result<()>> {
1227 self.clear_completed_plan_entries(cx);
1228
1229 let (tx, rx) = oneshot::channel();
1230 let cancel_task = self.cancel(cx);
1231
1232 self.send_task = Some(cx.spawn(async move |this, cx| {
1233 cancel_task.await;
1234 tx.send(f(this, cx).await).ok();
1235 }));
1236
1237 cx.spawn(async move |this, cx| {
1238 let response = rx.await;
1239
1240 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1241 .await?;
1242
1243 this.update(cx, |this, cx| {
1244 match response {
1245 Ok(Err(e)) => {
1246 this.send_task.take();
1247 cx.emit(AcpThreadEvent::Error);
1248 Err(e)
1249 }
1250 result => {
1251 let cancelled = matches!(
1252 result,
1253 Ok(Ok(acp::PromptResponse {
1254 stop_reason: acp::StopReason::Cancelled
1255 }))
1256 );
1257
1258 // We only take the task if the current prompt wasn't cancelled.
1259 //
1260 // This prompt may have been cancelled because another one was sent
1261 // while it was still generating. In these cases, dropping `send_task`
1262 // would cause the next generation to be cancelled.
1263 if !cancelled {
1264 this.send_task.take();
1265 }
1266
1267 cx.emit(AcpThreadEvent::Stopped);
1268 Ok(())
1269 }
1270 }
1271 })?
1272 })
1273 .boxed()
1274 }
1275
1276 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1277 let Some(send_task) = self.send_task.take() else {
1278 return Task::ready(());
1279 };
1280
1281 for entry in self.entries.iter_mut() {
1282 if let AgentThreadEntry::ToolCall(call) = entry {
1283 let cancel = matches!(
1284 call.status,
1285 ToolCallStatus::WaitingForConfirmation { .. }
1286 | ToolCallStatus::Allowed {
1287 status: acp::ToolCallStatus::InProgress
1288 }
1289 );
1290
1291 if cancel {
1292 call.status = ToolCallStatus::Canceled;
1293 }
1294 }
1295 }
1296
1297 self.connection.cancel(&self.session_id, cx);
1298
1299 // Wait for the send task to complete
1300 cx.foreground_executor().spawn(send_task)
1301 }
1302
1303 /// Rewinds this thread to before the entry at `index`, removing it and all
1304 /// subsequent entries while reverting any changes made from that point.
1305 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1306 let Some(session_editor) = self.connection.session_editor(&self.session_id, cx) else {
1307 return Task::ready(Err(anyhow!("not supported")));
1308 };
1309 let Some(message) = self.user_message(&id) else {
1310 return Task::ready(Err(anyhow!("message not found")));
1311 };
1312
1313 let checkpoint = message
1314 .checkpoint
1315 .as_ref()
1316 .map(|c| c.git_checkpoint.clone());
1317
1318 let git_store = self.project.read(cx).git_store().clone();
1319 cx.spawn(async move |this, cx| {
1320 if let Some(checkpoint) = checkpoint {
1321 git_store
1322 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1323 .await?;
1324 }
1325
1326 cx.update(|cx| session_editor.truncate(id.clone(), cx))?
1327 .await?;
1328 this.update(cx, |this, cx| {
1329 if let Some((ix, _)) = this.user_message_mut(&id) {
1330 let range = ix..this.entries.len();
1331 this.entries.truncate(ix);
1332 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1333 }
1334 })
1335 })
1336 }
1337
1338 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1339 let git_store = self.project.read(cx).git_store().clone();
1340
1341 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1342 if let Some(checkpoint) = message.checkpoint.as_ref() {
1343 checkpoint.git_checkpoint.clone()
1344 } else {
1345 return Task::ready(Ok(()));
1346 }
1347 } else {
1348 return Task::ready(Ok(()));
1349 };
1350
1351 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1352 cx.spawn(async move |this, cx| {
1353 let new_checkpoint = new_checkpoint
1354 .await
1355 .context("failed to get new checkpoint")
1356 .log_err();
1357 if let Some(new_checkpoint) = new_checkpoint {
1358 let equal = git_store
1359 .update(cx, |git, cx| {
1360 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1361 })?
1362 .await
1363 .unwrap_or(true);
1364 this.update(cx, |this, cx| {
1365 let (ix, message) = this.last_user_message().context("no user message")?;
1366 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1367 checkpoint.show = !equal;
1368 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1369 anyhow::Ok(())
1370 })??;
1371 }
1372
1373 Ok(())
1374 })
1375 }
1376
1377 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1378 self.entries
1379 .iter_mut()
1380 .enumerate()
1381 .rev()
1382 .find_map(|(ix, entry)| {
1383 if let AgentThreadEntry::UserMessage(message) = entry {
1384 Some((ix, message))
1385 } else {
1386 None
1387 }
1388 })
1389 }
1390
1391 fn user_message(&self, id: &UserMessageId) -> Option<&UserMessage> {
1392 self.entries.iter().find_map(|entry| {
1393 if let AgentThreadEntry::UserMessage(message) = entry {
1394 if message.id.as_ref() == Some(&id) {
1395 Some(message)
1396 } else {
1397 None
1398 }
1399 } else {
1400 None
1401 }
1402 })
1403 }
1404
1405 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1406 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1407 if let AgentThreadEntry::UserMessage(message) = entry {
1408 if message.id.as_ref() == Some(&id) {
1409 Some((ix, message))
1410 } else {
1411 None
1412 }
1413 } else {
1414 None
1415 }
1416 })
1417 }
1418
1419 pub fn read_text_file(
1420 &self,
1421 path: PathBuf,
1422 line: Option<u32>,
1423 limit: Option<u32>,
1424 reuse_shared_snapshot: bool,
1425 cx: &mut Context<Self>,
1426 ) -> Task<Result<String>> {
1427 let project = self.project.clone();
1428 let action_log = self.action_log.clone();
1429 cx.spawn(async move |this, cx| {
1430 let load = project.update(cx, |project, cx| {
1431 let path = project
1432 .project_path_for_absolute_path(&path, cx)
1433 .context("invalid path")?;
1434 anyhow::Ok(project.open_buffer(path, cx))
1435 });
1436 let buffer = load??.await?;
1437
1438 let snapshot = if reuse_shared_snapshot {
1439 this.read_with(cx, |this, _| {
1440 this.shared_buffers.get(&buffer.clone()).cloned()
1441 })
1442 .log_err()
1443 .flatten()
1444 } else {
1445 None
1446 };
1447
1448 let snapshot = if let Some(snapshot) = snapshot {
1449 snapshot
1450 } else {
1451 action_log.update(cx, |action_log, cx| {
1452 action_log.buffer_read(buffer.clone(), cx);
1453 })?;
1454 project.update(cx, |project, cx| {
1455 let position = buffer
1456 .read(cx)
1457 .snapshot()
1458 .anchor_before(Point::new(line.unwrap_or_default(), 0));
1459 project.set_agent_location(
1460 Some(AgentLocation {
1461 buffer: buffer.downgrade(),
1462 position,
1463 }),
1464 cx,
1465 );
1466 })?;
1467
1468 buffer.update(cx, |buffer, _| buffer.snapshot())?
1469 };
1470
1471 this.update(cx, |this, _| {
1472 let text = snapshot.text();
1473 this.shared_buffers.insert(buffer.clone(), snapshot);
1474 if line.is_none() && limit.is_none() {
1475 return Ok(text);
1476 }
1477 let limit = limit.unwrap_or(u32::MAX) as usize;
1478 let Some(line) = line else {
1479 return Ok(text.lines().take(limit).collect::<String>());
1480 };
1481
1482 let count = text.lines().count();
1483 if count < line as usize {
1484 anyhow::bail!("There are only {} lines", count);
1485 }
1486 Ok(text
1487 .lines()
1488 .skip(line as usize + 1)
1489 .take(limit)
1490 .collect::<String>())
1491 })?
1492 })
1493 }
1494
1495 pub fn write_text_file(
1496 &self,
1497 path: PathBuf,
1498 content: String,
1499 cx: &mut Context<Self>,
1500 ) -> Task<Result<()>> {
1501 let project = self.project.clone();
1502 let action_log = self.action_log.clone();
1503 cx.spawn(async move |this, cx| {
1504 let load = project.update(cx, |project, cx| {
1505 let path = project
1506 .project_path_for_absolute_path(&path, cx)
1507 .context("invalid path")?;
1508 anyhow::Ok(project.open_buffer(path, cx))
1509 });
1510 let buffer = load??.await?;
1511 let snapshot = this.update(cx, |this, cx| {
1512 this.shared_buffers
1513 .get(&buffer)
1514 .cloned()
1515 .unwrap_or_else(|| buffer.read(cx).snapshot())
1516 })?;
1517 let edits = cx
1518 .background_executor()
1519 .spawn(async move {
1520 let old_text = snapshot.text();
1521 text_diff(old_text.as_str(), &content)
1522 .into_iter()
1523 .map(|(range, replacement)| {
1524 (
1525 snapshot.anchor_after(range.start)
1526 ..snapshot.anchor_before(range.end),
1527 replacement,
1528 )
1529 })
1530 .collect::<Vec<_>>()
1531 })
1532 .await;
1533 cx.update(|cx| {
1534 project.update(cx, |project, cx| {
1535 project.set_agent_location(
1536 Some(AgentLocation {
1537 buffer: buffer.downgrade(),
1538 position: edits
1539 .last()
1540 .map(|(range, _)| range.end)
1541 .unwrap_or(Anchor::MIN),
1542 }),
1543 cx,
1544 );
1545 });
1546
1547 action_log.update(cx, |action_log, cx| {
1548 action_log.buffer_read(buffer.clone(), cx);
1549 });
1550 buffer.update(cx, |buffer, cx| {
1551 buffer.edit(edits, None, cx);
1552 });
1553 action_log.update(cx, |action_log, cx| {
1554 action_log.buffer_edited(buffer.clone(), cx);
1555 });
1556 })?;
1557 project
1558 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
1559 .await
1560 })
1561 }
1562
1563 pub fn to_markdown(&self, cx: &App) -> String {
1564 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
1565 }
1566
1567 pub fn emit_server_exited(&mut self, status: ExitStatus, cx: &mut Context<Self>) {
1568 cx.emit(AcpThreadEvent::ServerExited(status));
1569 }
1570}
1571
1572fn markdown_for_raw_output(
1573 raw_output: &serde_json::Value,
1574 language_registry: &Arc<LanguageRegistry>,
1575 cx: &mut App,
1576) -> Option<Entity<Markdown>> {
1577 match raw_output {
1578 serde_json::Value::Null => None,
1579 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
1580 Markdown::new(
1581 value.to_string().into(),
1582 Some(language_registry.clone()),
1583 None,
1584 cx,
1585 )
1586 })),
1587 serde_json::Value::Number(value) => Some(cx.new(|cx| {
1588 Markdown::new(
1589 value.to_string().into(),
1590 Some(language_registry.clone()),
1591 None,
1592 cx,
1593 )
1594 })),
1595 serde_json::Value::String(value) => Some(cx.new(|cx| {
1596 Markdown::new(
1597 value.clone().into(),
1598 Some(language_registry.clone()),
1599 None,
1600 cx,
1601 )
1602 })),
1603 value => Some(cx.new(|cx| {
1604 Markdown::new(
1605 format!("```json\n{}\n```", value).into(),
1606 Some(language_registry.clone()),
1607 None,
1608 cx,
1609 )
1610 })),
1611 }
1612}
1613
1614#[cfg(test)]
1615mod tests {
1616 use super::*;
1617 use anyhow::anyhow;
1618 use futures::{channel::mpsc, future::LocalBoxFuture, select};
1619 use gpui::{AsyncApp, TestAppContext, WeakEntity};
1620 use indoc::indoc;
1621 use project::{FakeFs, Fs};
1622 use rand::Rng as _;
1623 use serde_json::json;
1624 use settings::SettingsStore;
1625 use smol::stream::StreamExt as _;
1626 use std::{
1627 any::Any,
1628 cell::RefCell,
1629 path::Path,
1630 rc::Rc,
1631 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
1632 time::Duration,
1633 };
1634 use util::path;
1635
1636 fn init_test(cx: &mut TestAppContext) {
1637 env_logger::try_init().ok();
1638 cx.update(|cx| {
1639 let settings_store = SettingsStore::test(cx);
1640 cx.set_global(settings_store);
1641 Project::init_settings(cx);
1642 language::init(cx);
1643 });
1644 }
1645
1646 #[gpui::test]
1647 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
1648 init_test(cx);
1649
1650 let fs = FakeFs::new(cx.executor());
1651 let project = Project::test(fs, [], cx).await;
1652 let connection = Rc::new(FakeAgentConnection::new());
1653 let thread = cx
1654 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1655 .await
1656 .unwrap();
1657
1658 // Test creating a new user message
1659 thread.update(cx, |thread, cx| {
1660 thread.push_user_content_block(
1661 None,
1662 acp::ContentBlock::Text(acp::TextContent {
1663 annotations: None,
1664 text: "Hello, ".to_string(),
1665 }),
1666 cx,
1667 );
1668 });
1669
1670 thread.update(cx, |thread, cx| {
1671 assert_eq!(thread.entries.len(), 1);
1672 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1673 assert_eq!(user_msg.id, None);
1674 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
1675 } else {
1676 panic!("Expected UserMessage");
1677 }
1678 });
1679
1680 // Test appending to existing user message
1681 let message_1_id = UserMessageId::new();
1682 thread.update(cx, |thread, cx| {
1683 thread.push_user_content_block(
1684 Some(message_1_id.clone()),
1685 acp::ContentBlock::Text(acp::TextContent {
1686 annotations: None,
1687 text: "world!".to_string(),
1688 }),
1689 cx,
1690 );
1691 });
1692
1693 thread.update(cx, |thread, cx| {
1694 assert_eq!(thread.entries.len(), 1);
1695 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
1696 assert_eq!(user_msg.id, Some(message_1_id));
1697 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
1698 } else {
1699 panic!("Expected UserMessage");
1700 }
1701 });
1702
1703 // Test creating new user message after assistant message
1704 thread.update(cx, |thread, cx| {
1705 thread.push_assistant_content_block(
1706 acp::ContentBlock::Text(acp::TextContent {
1707 annotations: None,
1708 text: "Assistant response".to_string(),
1709 }),
1710 false,
1711 cx,
1712 );
1713 });
1714
1715 let message_2_id = UserMessageId::new();
1716 thread.update(cx, |thread, cx| {
1717 thread.push_user_content_block(
1718 Some(message_2_id.clone()),
1719 acp::ContentBlock::Text(acp::TextContent {
1720 annotations: None,
1721 text: "New user message".to_string(),
1722 }),
1723 cx,
1724 );
1725 });
1726
1727 thread.update(cx, |thread, cx| {
1728 assert_eq!(thread.entries.len(), 3);
1729 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
1730 assert_eq!(user_msg.id, Some(message_2_id));
1731 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
1732 } else {
1733 panic!("Expected UserMessage at index 2");
1734 }
1735 });
1736 }
1737
1738 #[gpui::test]
1739 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
1740 init_test(cx);
1741
1742 let fs = FakeFs::new(cx.executor());
1743 let project = Project::test(fs, [], cx).await;
1744 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1745 |_, thread, mut cx| {
1746 async move {
1747 thread.update(&mut cx, |thread, cx| {
1748 thread
1749 .handle_session_update(
1750 acp::SessionUpdate::AgentThoughtChunk {
1751 content: "Thinking ".into(),
1752 },
1753 cx,
1754 )
1755 .unwrap();
1756 thread
1757 .handle_session_update(
1758 acp::SessionUpdate::AgentThoughtChunk {
1759 content: "hard!".into(),
1760 },
1761 cx,
1762 )
1763 .unwrap();
1764 })?;
1765 Ok(acp::PromptResponse {
1766 stop_reason: acp::StopReason::EndTurn,
1767 })
1768 }
1769 .boxed_local()
1770 },
1771 ));
1772
1773 let thread = cx
1774 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1775 .await
1776 .unwrap();
1777
1778 thread
1779 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
1780 .await
1781 .unwrap();
1782
1783 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
1784 assert_eq!(
1785 output,
1786 indoc! {r#"
1787 ## User
1788
1789 Hello from Zed!
1790
1791 ## Assistant
1792
1793 <thinking>
1794 Thinking hard!
1795 </thinking>
1796
1797 "#}
1798 );
1799 }
1800
1801 #[gpui::test]
1802 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
1803 init_test(cx);
1804
1805 let fs = FakeFs::new(cx.executor());
1806 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
1807 .await;
1808 let project = Project::test(fs.clone(), [], cx).await;
1809 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
1810 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
1811 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
1812 move |_, thread, mut cx| {
1813 let read_file_tx = read_file_tx.clone();
1814 async move {
1815 let content = thread
1816 .update(&mut cx, |thread, cx| {
1817 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
1818 })
1819 .unwrap()
1820 .await
1821 .unwrap();
1822 assert_eq!(content, "one\ntwo\nthree\n");
1823 read_file_tx.take().unwrap().send(()).unwrap();
1824 thread
1825 .update(&mut cx, |thread, cx| {
1826 thread.write_text_file(
1827 path!("/tmp/foo").into(),
1828 "one\ntwo\nthree\nfour\nfive\n".to_string(),
1829 cx,
1830 )
1831 })
1832 .unwrap()
1833 .await
1834 .unwrap();
1835 Ok(acp::PromptResponse {
1836 stop_reason: acp::StopReason::EndTurn,
1837 })
1838 }
1839 .boxed_local()
1840 },
1841 ));
1842
1843 let (worktree, pathbuf) = project
1844 .update(cx, |project, cx| {
1845 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
1846 })
1847 .await
1848 .unwrap();
1849 let buffer = project
1850 .update(cx, |project, cx| {
1851 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
1852 })
1853 .await
1854 .unwrap();
1855
1856 let thread = cx
1857 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
1858 .await
1859 .unwrap();
1860
1861 let request = thread.update(cx, |thread, cx| {
1862 thread.send_raw("Extend the count in /tmp/foo", cx)
1863 });
1864 read_file_rx.await.ok();
1865 buffer.update(cx, |buffer, cx| {
1866 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
1867 });
1868 cx.run_until_parked();
1869 assert_eq!(
1870 buffer.read_with(cx, |buffer, _| buffer.text()),
1871 "zero\none\ntwo\nthree\nfour\nfive\n"
1872 );
1873 assert_eq!(
1874 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
1875 "zero\none\ntwo\nthree\nfour\nfive\n"
1876 );
1877 request.await.unwrap();
1878 }
1879
1880 #[gpui::test]
1881 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
1882 init_test(cx);
1883
1884 let fs = FakeFs::new(cx.executor());
1885 let project = Project::test(fs, [], cx).await;
1886 let id = acp::ToolCallId("test".into());
1887
1888 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1889 let id = id.clone();
1890 move |_, thread, mut cx| {
1891 let id = id.clone();
1892 async move {
1893 thread
1894 .update(&mut cx, |thread, cx| {
1895 thread.handle_session_update(
1896 acp::SessionUpdate::ToolCall(acp::ToolCall {
1897 id: id.clone(),
1898 title: "Label".into(),
1899 kind: acp::ToolKind::Fetch,
1900 status: acp::ToolCallStatus::InProgress,
1901 content: vec![],
1902 locations: vec![],
1903 raw_input: None,
1904 raw_output: None,
1905 }),
1906 cx,
1907 )
1908 })
1909 .unwrap()
1910 .unwrap();
1911 Ok(acp::PromptResponse {
1912 stop_reason: acp::StopReason::EndTurn,
1913 })
1914 }
1915 .boxed_local()
1916 }
1917 }));
1918
1919 let thread = cx
1920 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
1921 .await
1922 .unwrap();
1923
1924 let request = thread.update(cx, |thread, cx| {
1925 thread.send_raw("Fetch https://example.com", cx)
1926 });
1927
1928 run_until_first_tool_call(&thread, cx).await;
1929
1930 thread.read_with(cx, |thread, _| {
1931 assert!(matches!(
1932 thread.entries[1],
1933 AgentThreadEntry::ToolCall(ToolCall {
1934 status: ToolCallStatus::Allowed {
1935 status: acp::ToolCallStatus::InProgress,
1936 ..
1937 },
1938 ..
1939 })
1940 ));
1941 });
1942
1943 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
1944
1945 thread.read_with(cx, |thread, _| {
1946 assert!(matches!(
1947 &thread.entries[1],
1948 AgentThreadEntry::ToolCall(ToolCall {
1949 status: ToolCallStatus::Canceled,
1950 ..
1951 })
1952 ));
1953 });
1954
1955 thread
1956 .update(cx, |thread, cx| {
1957 thread.handle_session_update(
1958 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
1959 id,
1960 fields: acp::ToolCallUpdateFields {
1961 status: Some(acp::ToolCallStatus::Completed),
1962 ..Default::default()
1963 },
1964 }),
1965 cx,
1966 )
1967 })
1968 .unwrap();
1969
1970 request.await.unwrap();
1971
1972 thread.read_with(cx, |thread, _| {
1973 assert!(matches!(
1974 thread.entries[1],
1975 AgentThreadEntry::ToolCall(ToolCall {
1976 status: ToolCallStatus::Allowed {
1977 status: acp::ToolCallStatus::Completed,
1978 ..
1979 },
1980 ..
1981 })
1982 ));
1983 });
1984 }
1985
1986 #[gpui::test]
1987 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
1988 init_test(cx);
1989 let fs = FakeFs::new(cx.background_executor.clone());
1990 fs.insert_tree(path!("/test"), json!({})).await;
1991 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
1992
1993 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
1994 move |_, thread, mut cx| {
1995 async move {
1996 thread
1997 .update(&mut cx, |thread, cx| {
1998 thread.handle_session_update(
1999 acp::SessionUpdate::ToolCall(acp::ToolCall {
2000 id: acp::ToolCallId("test".into()),
2001 title: "Label".into(),
2002 kind: acp::ToolKind::Edit,
2003 status: acp::ToolCallStatus::Completed,
2004 content: vec![acp::ToolCallContent::Diff {
2005 diff: acp::Diff {
2006 path: "/test/test.txt".into(),
2007 old_text: None,
2008 new_text: "foo".into(),
2009 },
2010 }],
2011 locations: vec![],
2012 raw_input: None,
2013 raw_output: None,
2014 }),
2015 cx,
2016 )
2017 })
2018 .unwrap()
2019 .unwrap();
2020 Ok(acp::PromptResponse {
2021 stop_reason: acp::StopReason::EndTurn,
2022 })
2023 }
2024 .boxed_local()
2025 }
2026 }));
2027
2028 let thread = cx
2029 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2030 .await
2031 .unwrap();
2032
2033 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
2034 .await
2035 .unwrap();
2036
2037 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
2038 }
2039
2040 #[gpui::test(iterations = 10)]
2041 async fn test_checkpoints(cx: &mut TestAppContext) {
2042 init_test(cx);
2043 let fs = FakeFs::new(cx.background_executor.clone());
2044 fs.insert_tree(
2045 path!("/test"),
2046 json!({
2047 ".git": {}
2048 }),
2049 )
2050 .await;
2051 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
2052
2053 let simulate_changes = Arc::new(AtomicBool::new(true));
2054 let next_filename = Arc::new(AtomicUsize::new(0));
2055 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2056 let simulate_changes = simulate_changes.clone();
2057 let next_filename = next_filename.clone();
2058 let fs = fs.clone();
2059 move |request, thread, mut cx| {
2060 let fs = fs.clone();
2061 let simulate_changes = simulate_changes.clone();
2062 let next_filename = next_filename.clone();
2063 async move {
2064 if simulate_changes.load(SeqCst) {
2065 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
2066 fs.write(Path::new(&filename), b"").await?;
2067 }
2068
2069 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
2070 panic!("expected text content block");
2071 };
2072 thread.update(&mut cx, |thread, cx| {
2073 thread
2074 .handle_session_update(
2075 acp::SessionUpdate::AgentMessageChunk {
2076 content: content.text.to_uppercase().into(),
2077 },
2078 cx,
2079 )
2080 .unwrap();
2081 })?;
2082 Ok(acp::PromptResponse {
2083 stop_reason: acp::StopReason::EndTurn,
2084 })
2085 }
2086 .boxed_local()
2087 }
2088 }));
2089 let thread = cx
2090 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2091 .await
2092 .unwrap();
2093
2094 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
2095 .await
2096 .unwrap();
2097 thread.read_with(cx, |thread, cx| {
2098 assert_eq!(
2099 thread.to_markdown(cx),
2100 indoc! {"
2101 ## User (checkpoint)
2102
2103 Lorem
2104
2105 ## Assistant
2106
2107 LOREM
2108
2109 "}
2110 );
2111 });
2112 assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2113
2114 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
2115 .await
2116 .unwrap();
2117 thread.read_with(cx, |thread, cx| {
2118 assert_eq!(
2119 thread.to_markdown(cx),
2120 indoc! {"
2121 ## User (checkpoint)
2122
2123 Lorem
2124
2125 ## Assistant
2126
2127 LOREM
2128
2129 ## User (checkpoint)
2130
2131 ipsum
2132
2133 ## Assistant
2134
2135 IPSUM
2136
2137 "}
2138 );
2139 });
2140 assert_eq!(
2141 fs.files(),
2142 vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2143 );
2144
2145 // Checkpoint isn't stored when there are no changes.
2146 simulate_changes.store(false, SeqCst);
2147 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
2148 .await
2149 .unwrap();
2150 thread.read_with(cx, |thread, cx| {
2151 assert_eq!(
2152 thread.to_markdown(cx),
2153 indoc! {"
2154 ## User (checkpoint)
2155
2156 Lorem
2157
2158 ## Assistant
2159
2160 LOREM
2161
2162 ## User (checkpoint)
2163
2164 ipsum
2165
2166 ## Assistant
2167
2168 IPSUM
2169
2170 ## User
2171
2172 dolor
2173
2174 ## Assistant
2175
2176 DOLOR
2177
2178 "}
2179 );
2180 });
2181 assert_eq!(
2182 fs.files(),
2183 vec![Path::new("/test/file-0"), Path::new("/test/file-1")]
2184 );
2185
2186 // Rewinding the conversation truncates the history and restores the checkpoint.
2187 thread
2188 .update(cx, |thread, cx| {
2189 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
2190 panic!("unexpected entries {:?}", thread.entries)
2191 };
2192 thread.rewind(message.id.clone().unwrap(), cx)
2193 })
2194 .await
2195 .unwrap();
2196 thread.read_with(cx, |thread, cx| {
2197 assert_eq!(
2198 thread.to_markdown(cx),
2199 indoc! {"
2200 ## User (checkpoint)
2201
2202 Lorem
2203
2204 ## Assistant
2205
2206 LOREM
2207
2208 "}
2209 );
2210 });
2211 assert_eq!(fs.files(), vec![Path::new("/test/file-0")]);
2212 }
2213
2214 async fn run_until_first_tool_call(
2215 thread: &Entity<AcpThread>,
2216 cx: &mut TestAppContext,
2217 ) -> usize {
2218 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
2219
2220 let subscription = cx.update(|cx| {
2221 cx.subscribe(thread, move |thread, _, cx| {
2222 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
2223 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
2224 return tx.try_send(ix).unwrap();
2225 }
2226 }
2227 })
2228 });
2229
2230 select! {
2231 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
2232 panic!("Timeout waiting for tool call")
2233 }
2234 ix = rx.next().fuse() => {
2235 drop(subscription);
2236 ix.unwrap()
2237 }
2238 }
2239 }
2240
2241 #[derive(Clone, Default)]
2242 struct FakeAgentConnection {
2243 auth_methods: Vec<acp::AuthMethod>,
2244 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
2245 on_user_message: Option<
2246 Rc<
2247 dyn Fn(
2248 acp::PromptRequest,
2249 WeakEntity<AcpThread>,
2250 AsyncApp,
2251 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2252 + 'static,
2253 >,
2254 >,
2255 }
2256
2257 impl FakeAgentConnection {
2258 fn new() -> Self {
2259 Self {
2260 auth_methods: Vec::new(),
2261 on_user_message: None,
2262 sessions: Arc::default(),
2263 }
2264 }
2265
2266 #[expect(unused)]
2267 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
2268 self.auth_methods = auth_methods;
2269 self
2270 }
2271
2272 fn on_user_message(
2273 mut self,
2274 handler: impl Fn(
2275 acp::PromptRequest,
2276 WeakEntity<AcpThread>,
2277 AsyncApp,
2278 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
2279 + 'static,
2280 ) -> Self {
2281 self.on_user_message.replace(Rc::new(handler));
2282 self
2283 }
2284 }
2285
2286 impl AgentConnection for FakeAgentConnection {
2287 fn auth_methods(&self) -> &[acp::AuthMethod] {
2288 &self.auth_methods
2289 }
2290
2291 fn new_thread(
2292 self: Rc<Self>,
2293 project: Entity<Project>,
2294 _cwd: &Path,
2295 cx: &mut gpui::App,
2296 ) -> Task<gpui::Result<Entity<AcpThread>>> {
2297 let session_id = acp::SessionId(
2298 rand::thread_rng()
2299 .sample_iter(&rand::distributions::Alphanumeric)
2300 .take(7)
2301 .map(char::from)
2302 .collect::<String>()
2303 .into(),
2304 );
2305 let thread =
2306 cx.new(|cx| AcpThread::new("Test", self.clone(), project, session_id.clone(), cx));
2307 self.sessions.lock().insert(session_id, thread.downgrade());
2308 Task::ready(Ok(thread))
2309 }
2310
2311 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
2312 if self.auth_methods().iter().any(|m| m.id == method) {
2313 Task::ready(Ok(()))
2314 } else {
2315 Task::ready(Err(anyhow!("Invalid Auth Method")))
2316 }
2317 }
2318
2319 fn prompt(
2320 &self,
2321 _id: Option<UserMessageId>,
2322 params: acp::PromptRequest,
2323 cx: &mut App,
2324 ) -> Task<gpui::Result<acp::PromptResponse>> {
2325 let sessions = self.sessions.lock();
2326 let thread = sessions.get(¶ms.session_id).unwrap();
2327 if let Some(handler) = &self.on_user_message {
2328 let handler = handler.clone();
2329 let thread = thread.clone();
2330 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
2331 } else {
2332 Task::ready(Ok(acp::PromptResponse {
2333 stop_reason: acp::StopReason::EndTurn,
2334 }))
2335 }
2336 }
2337
2338 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
2339 let sessions = self.sessions.lock();
2340 let thread = sessions.get(&session_id).unwrap().clone();
2341
2342 cx.spawn(async move |cx| {
2343 thread
2344 .update(cx, |thread, cx| thread.cancel(cx))
2345 .unwrap()
2346 .await
2347 })
2348 .detach();
2349 }
2350
2351 fn session_editor(
2352 &self,
2353 session_id: &acp::SessionId,
2354 _cx: &mut App,
2355 ) -> Option<Rc<dyn AgentSessionEditor>> {
2356 Some(Rc::new(FakeAgentSessionEditor {
2357 _session_id: session_id.clone(),
2358 }))
2359 }
2360
2361 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
2362 self
2363 }
2364 }
2365
2366 struct FakeAgentSessionEditor {
2367 _session_id: acp::SessionId,
2368 }
2369
2370 impl AgentSessionEditor for FakeAgentSessionEditor {
2371 fn truncate(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
2372 Task::ready(Ok(()))
2373 }
2374 }
2375}