1use std::fmt::Write as _;
2use std::io::Write;
3use std::ops::Range;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Result, anyhow};
8use assistant_settings::AssistantSettings;
9use assistant_tool::{ActionLog, AnyToolCard, Tool, ToolWorkingSet};
10use chrono::{DateTime, Utc};
11use collections::HashMap;
12use editor::display_map::CreaseMetadata;
13use feature_flags::{self, FeatureFlagAppExt};
14use futures::future::Shared;
15use futures::{FutureExt, StreamExt as _};
16use git::repository::DiffType;
17use gpui::{
18 AnyWindowHandle, App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task,
19 WeakEntity,
20};
21use language_model::{
22 ConfiguredModel, LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent,
23 LanguageModelId, LanguageModelKnownError, LanguageModelRegistry, LanguageModelRequest,
24 LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
25 LanguageModelToolUseId, MaxMonthlySpendReachedError, MessageContent,
26 ModelRequestLimitReachedError, PaymentRequiredError, RequestUsage, Role, SelectedModel,
27 StopReason, TokenUsage,
28};
29use postage::stream::Stream as _;
30use project::Project;
31use project::git_store::{GitStore, GitStoreCheckpoint, RepositoryState};
32use prompt_store::{ModelContext, PromptBuilder};
33use proto::Plan;
34use schemars::JsonSchema;
35use serde::{Deserialize, Serialize};
36use settings::Settings;
37use thiserror::Error;
38use util::{ResultExt as _, TryFutureExt as _, post_inc};
39use uuid::Uuid;
40use zed_llm_client::CompletionMode;
41
42use crate::ThreadStore;
43use crate::context::{AgentContext, AgentContextHandle, ContextLoadResult, LoadedContext};
44use crate::thread_store::{
45 SerializedCrease, SerializedLanguageModel, SerializedMessage, SerializedMessageSegment,
46 SerializedThread, SerializedToolResult, SerializedToolUse, SharedProjectContext,
47};
48use crate::tool_use::{PendingToolUse, ToolUse, ToolUseMetadata, ToolUseState};
49
50#[derive(
51 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize, JsonSchema,
52)]
53pub struct ThreadId(Arc<str>);
54
55impl ThreadId {
56 pub fn new() -> Self {
57 Self(Uuid::new_v4().to_string().into())
58 }
59}
60
61impl std::fmt::Display for ThreadId {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.0)
64 }
65}
66
67impl From<&str> for ThreadId {
68 fn from(value: &str) -> Self {
69 Self(value.into())
70 }
71}
72
73/// The ID of the user prompt that initiated a request.
74///
75/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
76#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
77pub struct PromptId(Arc<str>);
78
79impl PromptId {
80 pub fn new() -> Self {
81 Self(Uuid::new_v4().to_string().into())
82 }
83}
84
85impl std::fmt::Display for PromptId {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(f, "{}", self.0)
88 }
89}
90
91#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, Serialize, Deserialize)]
92pub struct MessageId(pub(crate) usize);
93
94impl MessageId {
95 fn post_inc(&mut self) -> Self {
96 Self(post_inc(&mut self.0))
97 }
98}
99
100/// Stored information that can be used to resurrect a context crease when creating an editor for a past message.
101#[derive(Clone, Debug)]
102pub struct MessageCrease {
103 pub range: Range<usize>,
104 pub metadata: CreaseMetadata,
105 /// None for a deserialized message, Some otherwise.
106 pub context: Option<AgentContextHandle>,
107}
108
109/// A message in a [`Thread`].
110#[derive(Debug, Clone)]
111pub struct Message {
112 pub id: MessageId,
113 pub role: Role,
114 pub segments: Vec<MessageSegment>,
115 pub loaded_context: LoadedContext,
116 pub creases: Vec<MessageCrease>,
117}
118
119impl Message {
120 /// Returns whether the message contains any meaningful text that should be displayed
121 /// The model sometimes runs tool without producing any text or just a marker ([`USING_TOOL_MARKER`])
122 pub fn should_display_content(&self) -> bool {
123 self.segments.iter().all(|segment| segment.should_display())
124 }
125
126 pub fn push_thinking(&mut self, text: &str, signature: Option<String>) {
127 if let Some(MessageSegment::Thinking {
128 text: segment,
129 signature: current_signature,
130 }) = self.segments.last_mut()
131 {
132 if let Some(signature) = signature {
133 *current_signature = Some(signature);
134 }
135 segment.push_str(text);
136 } else {
137 self.segments.push(MessageSegment::Thinking {
138 text: text.to_string(),
139 signature,
140 });
141 }
142 }
143
144 pub fn push_text(&mut self, text: &str) {
145 if let Some(MessageSegment::Text(segment)) = self.segments.last_mut() {
146 segment.push_str(text);
147 } else {
148 self.segments.push(MessageSegment::Text(text.to_string()));
149 }
150 }
151
152 pub fn to_string(&self) -> String {
153 let mut result = String::new();
154
155 if !self.loaded_context.text.is_empty() {
156 result.push_str(&self.loaded_context.text);
157 }
158
159 for segment in &self.segments {
160 match segment {
161 MessageSegment::Text(text) => result.push_str(text),
162 MessageSegment::Thinking { text, .. } => {
163 result.push_str("<think>\n");
164 result.push_str(text);
165 result.push_str("\n</think>");
166 }
167 MessageSegment::RedactedThinking(_) => {}
168 }
169 }
170
171 result
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub enum MessageSegment {
177 Text(String),
178 Thinking {
179 text: String,
180 signature: Option<String>,
181 },
182 RedactedThinking(Vec<u8>),
183}
184
185impl MessageSegment {
186 pub fn should_display(&self) -> bool {
187 match self {
188 Self::Text(text) => text.is_empty(),
189 Self::Thinking { text, .. } => text.is_empty(),
190 Self::RedactedThinking(_) => false,
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct ProjectSnapshot {
197 pub worktree_snapshots: Vec<WorktreeSnapshot>,
198 pub unsaved_buffer_paths: Vec<String>,
199 pub timestamp: DateTime<Utc>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct WorktreeSnapshot {
204 pub worktree_path: String,
205 pub git_state: Option<GitState>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct GitState {
210 pub remote_url: Option<String>,
211 pub head_sha: Option<String>,
212 pub current_branch: Option<String>,
213 pub diff: Option<String>,
214}
215
216#[derive(Clone)]
217pub struct ThreadCheckpoint {
218 message_id: MessageId,
219 git_checkpoint: GitStoreCheckpoint,
220}
221
222#[derive(Copy, Clone, Debug, PartialEq, Eq)]
223pub enum ThreadFeedback {
224 Positive,
225 Negative,
226}
227
228pub enum LastRestoreCheckpoint {
229 Pending {
230 message_id: MessageId,
231 },
232 Error {
233 message_id: MessageId,
234 error: String,
235 },
236}
237
238impl LastRestoreCheckpoint {
239 pub fn message_id(&self) -> MessageId {
240 match self {
241 LastRestoreCheckpoint::Pending { message_id } => *message_id,
242 LastRestoreCheckpoint::Error { message_id, .. } => *message_id,
243 }
244 }
245}
246
247#[derive(Clone, Debug, Default, Serialize, Deserialize)]
248pub enum DetailedSummaryState {
249 #[default]
250 NotGenerated,
251 Generating {
252 message_id: MessageId,
253 },
254 Generated {
255 text: SharedString,
256 message_id: MessageId,
257 },
258}
259
260impl DetailedSummaryState {
261 fn text(&self) -> Option<SharedString> {
262 if let Self::Generated { text, .. } = self {
263 Some(text.clone())
264 } else {
265 None
266 }
267 }
268}
269
270#[derive(Default)]
271pub struct TotalTokenUsage {
272 pub total: usize,
273 pub max: usize,
274}
275
276impl TotalTokenUsage {
277 pub fn ratio(&self) -> TokenUsageRatio {
278 #[cfg(debug_assertions)]
279 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
280 .unwrap_or("0.8".to_string())
281 .parse()
282 .unwrap();
283 #[cfg(not(debug_assertions))]
284 let warning_threshold: f32 = 0.8;
285
286 // When the maximum is unknown because there is no selected model,
287 // avoid showing the token limit warning.
288 if self.max == 0 {
289 TokenUsageRatio::Normal
290 } else if self.total >= self.max {
291 TokenUsageRatio::Exceeded
292 } else if self.total as f32 / self.max as f32 >= warning_threshold {
293 TokenUsageRatio::Warning
294 } else {
295 TokenUsageRatio::Normal
296 }
297 }
298
299 pub fn add(&self, tokens: usize) -> TotalTokenUsage {
300 TotalTokenUsage {
301 total: self.total + tokens,
302 max: self.max,
303 }
304 }
305}
306
307#[derive(Debug, Default, PartialEq, Eq)]
308pub enum TokenUsageRatio {
309 #[default]
310 Normal,
311 Warning,
312 Exceeded,
313}
314
315fn default_completion_mode(cx: &App) -> CompletionMode {
316 if cx.is_staff() {
317 CompletionMode::Max
318 } else {
319 CompletionMode::Normal
320 }
321}
322
323/// A thread of conversation with the LLM.
324pub struct Thread {
325 id: ThreadId,
326 updated_at: DateTime<Utc>,
327 summary: Option<SharedString>,
328 pending_summary: Task<Option<()>>,
329 detailed_summary_task: Task<Option<()>>,
330 detailed_summary_tx: postage::watch::Sender<DetailedSummaryState>,
331 detailed_summary_rx: postage::watch::Receiver<DetailedSummaryState>,
332 completion_mode: CompletionMode,
333 messages: Vec<Message>,
334 next_message_id: MessageId,
335 last_prompt_id: PromptId,
336 project_context: SharedProjectContext,
337 checkpoints_by_message: HashMap<MessageId, ThreadCheckpoint>,
338 completion_count: usize,
339 pending_completions: Vec<PendingCompletion>,
340 project: Entity<Project>,
341 prompt_builder: Arc<PromptBuilder>,
342 tools: Entity<ToolWorkingSet>,
343 tool_use: ToolUseState,
344 action_log: Entity<ActionLog>,
345 last_restore_checkpoint: Option<LastRestoreCheckpoint>,
346 pending_checkpoint: Option<ThreadCheckpoint>,
347 initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
348 request_token_usage: Vec<TokenUsage>,
349 cumulative_token_usage: TokenUsage,
350 exceeded_window_error: Option<ExceededWindowError>,
351 feedback: Option<ThreadFeedback>,
352 message_feedback: HashMap<MessageId, ThreadFeedback>,
353 last_auto_capture_at: Option<Instant>,
354 request_callback: Option<
355 Box<dyn FnMut(&LanguageModelRequest, &[Result<LanguageModelCompletionEvent, String>])>,
356 >,
357 remaining_turns: u32,
358 configured_model: Option<ConfiguredModel>,
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize)]
362pub struct ExceededWindowError {
363 /// Model used when last message exceeded context window
364 model_id: LanguageModelId,
365 /// Token count including last message
366 token_count: usize,
367}
368
369impl Thread {
370 pub fn new(
371 project: Entity<Project>,
372 tools: Entity<ToolWorkingSet>,
373 prompt_builder: Arc<PromptBuilder>,
374 system_prompt: SharedProjectContext,
375 cx: &mut Context<Self>,
376 ) -> Self {
377 let (detailed_summary_tx, detailed_summary_rx) = postage::watch::channel();
378 let configured_model = LanguageModelRegistry::read_global(cx).default_model();
379
380 Self {
381 id: ThreadId::new(),
382 updated_at: Utc::now(),
383 summary: None,
384 pending_summary: Task::ready(None),
385 detailed_summary_task: Task::ready(None),
386 detailed_summary_tx,
387 detailed_summary_rx,
388 completion_mode: default_completion_mode(cx),
389 messages: Vec::new(),
390 next_message_id: MessageId(0),
391 last_prompt_id: PromptId::new(),
392 project_context: system_prompt,
393 checkpoints_by_message: HashMap::default(),
394 completion_count: 0,
395 pending_completions: Vec::new(),
396 project: project.clone(),
397 prompt_builder,
398 tools: tools.clone(),
399 last_restore_checkpoint: None,
400 pending_checkpoint: None,
401 tool_use: ToolUseState::new(tools.clone()),
402 action_log: cx.new(|_| ActionLog::new(project.clone())),
403 initial_project_snapshot: {
404 let project_snapshot = Self::project_snapshot(project, cx);
405 cx.foreground_executor()
406 .spawn(async move { Some(project_snapshot.await) })
407 .shared()
408 },
409 request_token_usage: Vec::new(),
410 cumulative_token_usage: TokenUsage::default(),
411 exceeded_window_error: None,
412 feedback: None,
413 message_feedback: HashMap::default(),
414 last_auto_capture_at: None,
415 request_callback: None,
416 remaining_turns: u32::MAX,
417 configured_model,
418 }
419 }
420
421 pub fn deserialize(
422 id: ThreadId,
423 serialized: SerializedThread,
424 project: Entity<Project>,
425 tools: Entity<ToolWorkingSet>,
426 prompt_builder: Arc<PromptBuilder>,
427 project_context: SharedProjectContext,
428 cx: &mut Context<Self>,
429 ) -> Self {
430 let next_message_id = MessageId(
431 serialized
432 .messages
433 .last()
434 .map(|message| message.id.0 + 1)
435 .unwrap_or(0),
436 );
437 let tool_use = ToolUseState::from_serialized_messages(tools.clone(), &serialized.messages);
438 let (detailed_summary_tx, detailed_summary_rx) =
439 postage::watch::channel_with(serialized.detailed_summary_state);
440
441 let configured_model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
442 serialized
443 .model
444 .and_then(|model| {
445 let model = SelectedModel {
446 provider: model.provider.clone().into(),
447 model: model.model.clone().into(),
448 };
449 registry.select_model(&model, cx)
450 })
451 .or_else(|| registry.default_model())
452 });
453
454 Self {
455 id,
456 updated_at: serialized.updated_at,
457 summary: Some(serialized.summary),
458 pending_summary: Task::ready(None),
459 detailed_summary_task: Task::ready(None),
460 detailed_summary_tx,
461 detailed_summary_rx,
462 completion_mode: default_completion_mode(cx),
463 messages: serialized
464 .messages
465 .into_iter()
466 .map(|message| Message {
467 id: message.id,
468 role: message.role,
469 segments: message
470 .segments
471 .into_iter()
472 .map(|segment| match segment {
473 SerializedMessageSegment::Text { text } => MessageSegment::Text(text),
474 SerializedMessageSegment::Thinking { text, signature } => {
475 MessageSegment::Thinking { text, signature }
476 }
477 SerializedMessageSegment::RedactedThinking { data } => {
478 MessageSegment::RedactedThinking(data)
479 }
480 })
481 .collect(),
482 loaded_context: LoadedContext {
483 contexts: Vec::new(),
484 text: message.context,
485 images: Vec::new(),
486 },
487 creases: message
488 .creases
489 .into_iter()
490 .map(|crease| MessageCrease {
491 range: crease.start..crease.end,
492 metadata: CreaseMetadata {
493 icon_path: crease.icon_path,
494 label: crease.label,
495 },
496 context: None,
497 })
498 .collect(),
499 })
500 .collect(),
501 next_message_id,
502 last_prompt_id: PromptId::new(),
503 project_context,
504 checkpoints_by_message: HashMap::default(),
505 completion_count: 0,
506 pending_completions: Vec::new(),
507 last_restore_checkpoint: None,
508 pending_checkpoint: None,
509 project: project.clone(),
510 prompt_builder,
511 tools,
512 tool_use,
513 action_log: cx.new(|_| ActionLog::new(project)),
514 initial_project_snapshot: Task::ready(serialized.initial_project_snapshot).shared(),
515 request_token_usage: serialized.request_token_usage,
516 cumulative_token_usage: serialized.cumulative_token_usage,
517 exceeded_window_error: None,
518 feedback: None,
519 message_feedback: HashMap::default(),
520 last_auto_capture_at: None,
521 request_callback: None,
522 remaining_turns: u32::MAX,
523 configured_model,
524 }
525 }
526
527 pub fn set_request_callback(
528 &mut self,
529 callback: impl 'static
530 + FnMut(&LanguageModelRequest, &[Result<LanguageModelCompletionEvent, String>]),
531 ) {
532 self.request_callback = Some(Box::new(callback));
533 }
534
535 pub fn id(&self) -> &ThreadId {
536 &self.id
537 }
538
539 pub fn is_empty(&self) -> bool {
540 self.messages.is_empty()
541 }
542
543 pub fn updated_at(&self) -> DateTime<Utc> {
544 self.updated_at
545 }
546
547 pub fn touch_updated_at(&mut self) {
548 self.updated_at = Utc::now();
549 }
550
551 pub fn advance_prompt_id(&mut self) {
552 self.last_prompt_id = PromptId::new();
553 }
554
555 pub fn summary(&self) -> Option<SharedString> {
556 self.summary.clone()
557 }
558
559 pub fn project_context(&self) -> SharedProjectContext {
560 self.project_context.clone()
561 }
562
563 pub fn get_or_init_configured_model(&mut self, cx: &App) -> Option<ConfiguredModel> {
564 if self.configured_model.is_none() {
565 self.configured_model = LanguageModelRegistry::read_global(cx).default_model();
566 }
567 self.configured_model.clone()
568 }
569
570 pub fn configured_model(&self) -> Option<ConfiguredModel> {
571 self.configured_model.clone()
572 }
573
574 pub fn set_configured_model(&mut self, model: Option<ConfiguredModel>, cx: &mut Context<Self>) {
575 self.configured_model = model;
576 cx.notify();
577 }
578
579 pub const DEFAULT_SUMMARY: SharedString = SharedString::new_static("New Thread");
580
581 pub fn summary_or_default(&self) -> SharedString {
582 self.summary.clone().unwrap_or(Self::DEFAULT_SUMMARY)
583 }
584
585 pub fn set_summary(&mut self, new_summary: impl Into<SharedString>, cx: &mut Context<Self>) {
586 let Some(current_summary) = &self.summary else {
587 // Don't allow setting summary until generated
588 return;
589 };
590
591 let mut new_summary = new_summary.into();
592
593 if new_summary.is_empty() {
594 new_summary = Self::DEFAULT_SUMMARY;
595 }
596
597 if current_summary != &new_summary {
598 self.summary = Some(new_summary);
599 cx.emit(ThreadEvent::SummaryChanged);
600 }
601 }
602
603 pub fn completion_mode(&self) -> CompletionMode {
604 self.completion_mode
605 }
606
607 pub fn set_completion_mode(&mut self, mode: CompletionMode) {
608 self.completion_mode = mode;
609 }
610
611 pub fn message(&self, id: MessageId) -> Option<&Message> {
612 let index = self
613 .messages
614 .binary_search_by(|message| message.id.cmp(&id))
615 .ok()?;
616
617 self.messages.get(index)
618 }
619
620 pub fn messages(&self) -> impl ExactSizeIterator<Item = &Message> {
621 self.messages.iter()
622 }
623
624 pub fn is_generating(&self) -> bool {
625 !self.pending_completions.is_empty() || !self.all_tools_finished()
626 }
627
628 pub fn tools(&self) -> &Entity<ToolWorkingSet> {
629 &self.tools
630 }
631
632 pub fn pending_tool(&self, id: &LanguageModelToolUseId) -> Option<&PendingToolUse> {
633 self.tool_use
634 .pending_tool_uses()
635 .into_iter()
636 .find(|tool_use| &tool_use.id == id)
637 }
638
639 pub fn tools_needing_confirmation(&self) -> impl Iterator<Item = &PendingToolUse> {
640 self.tool_use
641 .pending_tool_uses()
642 .into_iter()
643 .filter(|tool_use| tool_use.status.needs_confirmation())
644 }
645
646 pub fn has_pending_tool_uses(&self) -> bool {
647 !self.tool_use.pending_tool_uses().is_empty()
648 }
649
650 pub fn checkpoint_for_message(&self, id: MessageId) -> Option<ThreadCheckpoint> {
651 self.checkpoints_by_message.get(&id).cloned()
652 }
653
654 pub fn restore_checkpoint(
655 &mut self,
656 checkpoint: ThreadCheckpoint,
657 cx: &mut Context<Self>,
658 ) -> Task<Result<()>> {
659 self.last_restore_checkpoint = Some(LastRestoreCheckpoint::Pending {
660 message_id: checkpoint.message_id,
661 });
662 cx.emit(ThreadEvent::CheckpointChanged);
663 cx.notify();
664
665 let git_store = self.project().read(cx).git_store().clone();
666 let restore = git_store.update(cx, |git_store, cx| {
667 git_store.restore_checkpoint(checkpoint.git_checkpoint.clone(), cx)
668 });
669
670 cx.spawn(async move |this, cx| {
671 let result = restore.await;
672 this.update(cx, |this, cx| {
673 if let Err(err) = result.as_ref() {
674 this.last_restore_checkpoint = Some(LastRestoreCheckpoint::Error {
675 message_id: checkpoint.message_id,
676 error: err.to_string(),
677 });
678 } else {
679 this.truncate(checkpoint.message_id, cx);
680 this.last_restore_checkpoint = None;
681 }
682 this.pending_checkpoint = None;
683 cx.emit(ThreadEvent::CheckpointChanged);
684 cx.notify();
685 })?;
686 result
687 })
688 }
689
690 fn finalize_pending_checkpoint(&mut self, cx: &mut Context<Self>) {
691 let pending_checkpoint = if self.is_generating() {
692 return;
693 } else if let Some(checkpoint) = self.pending_checkpoint.take() {
694 checkpoint
695 } else {
696 return;
697 };
698
699 let git_store = self.project.read(cx).git_store().clone();
700 let final_checkpoint = git_store.update(cx, |git_store, cx| git_store.checkpoint(cx));
701 cx.spawn(async move |this, cx| match final_checkpoint.await {
702 Ok(final_checkpoint) => {
703 let equal = git_store
704 .update(cx, |store, cx| {
705 store.compare_checkpoints(
706 pending_checkpoint.git_checkpoint.clone(),
707 final_checkpoint.clone(),
708 cx,
709 )
710 })?
711 .await
712 .unwrap_or(false);
713
714 if !equal {
715 this.update(cx, |this, cx| {
716 this.insert_checkpoint(pending_checkpoint, cx)
717 })?;
718 }
719
720 Ok(())
721 }
722 Err(_) => this.update(cx, |this, cx| {
723 this.insert_checkpoint(pending_checkpoint, cx)
724 }),
725 })
726 .detach();
727 }
728
729 fn insert_checkpoint(&mut self, checkpoint: ThreadCheckpoint, cx: &mut Context<Self>) {
730 self.checkpoints_by_message
731 .insert(checkpoint.message_id, checkpoint);
732 cx.emit(ThreadEvent::CheckpointChanged);
733 cx.notify();
734 }
735
736 pub fn last_restore_checkpoint(&self) -> Option<&LastRestoreCheckpoint> {
737 self.last_restore_checkpoint.as_ref()
738 }
739
740 pub fn truncate(&mut self, message_id: MessageId, cx: &mut Context<Self>) {
741 let Some(message_ix) = self
742 .messages
743 .iter()
744 .rposition(|message| message.id == message_id)
745 else {
746 return;
747 };
748 for deleted_message in self.messages.drain(message_ix..) {
749 self.checkpoints_by_message.remove(&deleted_message.id);
750 }
751 cx.notify();
752 }
753
754 pub fn context_for_message(&self, id: MessageId) -> impl Iterator<Item = &AgentContext> {
755 self.messages
756 .iter()
757 .find(|message| message.id == id)
758 .into_iter()
759 .flat_map(|message| message.loaded_context.contexts.iter())
760 }
761
762 pub fn is_turn_end(&self, ix: usize) -> bool {
763 if self.messages.is_empty() {
764 return false;
765 }
766
767 if !self.is_generating() && ix == self.messages.len() - 1 {
768 return true;
769 }
770
771 let Some(message) = self.messages.get(ix) else {
772 return false;
773 };
774
775 if message.role != Role::Assistant {
776 return false;
777 }
778
779 self.messages
780 .get(ix + 1)
781 .and_then(|message| {
782 self.message(message.id)
783 .map(|next_message| next_message.role == Role::User)
784 })
785 .unwrap_or(false)
786 }
787
788 /// Returns whether all of the tool uses have finished running.
789 pub fn all_tools_finished(&self) -> bool {
790 // If the only pending tool uses left are the ones with errors, then
791 // that means that we've finished running all of the pending tools.
792 self.tool_use
793 .pending_tool_uses()
794 .iter()
795 .all(|tool_use| tool_use.status.is_error())
796 }
797
798 pub fn tool_uses_for_message(&self, id: MessageId, cx: &App) -> Vec<ToolUse> {
799 self.tool_use.tool_uses_for_message(id, cx)
800 }
801
802 pub fn tool_results_for_message(
803 &self,
804 assistant_message_id: MessageId,
805 ) -> Vec<&LanguageModelToolResult> {
806 self.tool_use.tool_results_for_message(assistant_message_id)
807 }
808
809 pub fn tool_result(&self, id: &LanguageModelToolUseId) -> Option<&LanguageModelToolResult> {
810 self.tool_use.tool_result(id)
811 }
812
813 pub fn output_for_tool(&self, id: &LanguageModelToolUseId) -> Option<&Arc<str>> {
814 Some(&self.tool_use.tool_result(id)?.content)
815 }
816
817 pub fn card_for_tool(&self, id: &LanguageModelToolUseId) -> Option<AnyToolCard> {
818 self.tool_use.tool_result_card(id).cloned()
819 }
820
821 /// Return tools that are both enabled and supported by the model
822 pub fn available_tools(
823 &self,
824 cx: &App,
825 model: Arc<dyn LanguageModel>,
826 ) -> Vec<LanguageModelRequestTool> {
827 if model.supports_tools() {
828 self.tools()
829 .read(cx)
830 .enabled_tools(cx)
831 .into_iter()
832 .filter_map(|tool| {
833 // Skip tools that cannot be supported
834 let input_schema = tool.input_schema(model.tool_input_format()).ok()?;
835 Some(LanguageModelRequestTool {
836 name: tool.name(),
837 description: tool.description(),
838 input_schema,
839 })
840 })
841 .collect()
842 } else {
843 Vec::default()
844 }
845 }
846
847 pub fn insert_user_message(
848 &mut self,
849 text: impl Into<String>,
850 loaded_context: ContextLoadResult,
851 git_checkpoint: Option<GitStoreCheckpoint>,
852 creases: Vec<MessageCrease>,
853 cx: &mut Context<Self>,
854 ) -> MessageId {
855 if !loaded_context.referenced_buffers.is_empty() {
856 self.action_log.update(cx, |log, cx| {
857 for buffer in loaded_context.referenced_buffers {
858 log.track_buffer(buffer, cx);
859 }
860 });
861 }
862
863 let message_id = self.insert_message(
864 Role::User,
865 vec![MessageSegment::Text(text.into())],
866 loaded_context.loaded_context,
867 creases,
868 cx,
869 );
870
871 if let Some(git_checkpoint) = git_checkpoint {
872 self.pending_checkpoint = Some(ThreadCheckpoint {
873 message_id,
874 git_checkpoint,
875 });
876 }
877
878 self.auto_capture_telemetry(cx);
879
880 message_id
881 }
882
883 pub fn insert_assistant_message(
884 &mut self,
885 segments: Vec<MessageSegment>,
886 cx: &mut Context<Self>,
887 ) -> MessageId {
888 self.insert_message(
889 Role::Assistant,
890 segments,
891 LoadedContext::default(),
892 Vec::new(),
893 cx,
894 )
895 }
896
897 pub fn insert_message(
898 &mut self,
899 role: Role,
900 segments: Vec<MessageSegment>,
901 loaded_context: LoadedContext,
902 creases: Vec<MessageCrease>,
903 cx: &mut Context<Self>,
904 ) -> MessageId {
905 let id = self.next_message_id.post_inc();
906 self.messages.push(Message {
907 id,
908 role,
909 segments,
910 loaded_context,
911 creases,
912 });
913 self.touch_updated_at();
914 cx.emit(ThreadEvent::MessageAdded(id));
915 id
916 }
917
918 pub fn edit_message(
919 &mut self,
920 id: MessageId,
921 new_role: Role,
922 new_segments: Vec<MessageSegment>,
923 loaded_context: Option<LoadedContext>,
924 cx: &mut Context<Self>,
925 ) -> bool {
926 let Some(message) = self.messages.iter_mut().find(|message| message.id == id) else {
927 return false;
928 };
929 message.role = new_role;
930 message.segments = new_segments;
931 if let Some(context) = loaded_context {
932 message.loaded_context = context;
933 }
934 self.touch_updated_at();
935 cx.emit(ThreadEvent::MessageEdited(id));
936 true
937 }
938
939 pub fn delete_message(&mut self, id: MessageId, cx: &mut Context<Self>) -> bool {
940 let Some(index) = self.messages.iter().position(|message| message.id == id) else {
941 return false;
942 };
943 self.messages.remove(index);
944 self.touch_updated_at();
945 cx.emit(ThreadEvent::MessageDeleted(id));
946 true
947 }
948
949 /// Returns the representation of this [`Thread`] in a textual form.
950 ///
951 /// This is the representation we use when attaching a thread as context to another thread.
952 pub fn text(&self) -> String {
953 let mut text = String::new();
954
955 for message in &self.messages {
956 text.push_str(match message.role {
957 language_model::Role::User => "User:",
958 language_model::Role::Assistant => "Assistant:",
959 language_model::Role::System => "System:",
960 });
961 text.push('\n');
962
963 for segment in &message.segments {
964 match segment {
965 MessageSegment::Text(content) => text.push_str(content),
966 MessageSegment::Thinking { text: content, .. } => {
967 text.push_str(&format!("<think>{}</think>", content))
968 }
969 MessageSegment::RedactedThinking(_) => {}
970 }
971 }
972 text.push('\n');
973 }
974
975 text
976 }
977
978 /// Serializes this thread into a format for storage or telemetry.
979 pub fn serialize(&self, cx: &mut Context<Self>) -> Task<Result<SerializedThread>> {
980 let initial_project_snapshot = self.initial_project_snapshot.clone();
981 cx.spawn(async move |this, cx| {
982 let initial_project_snapshot = initial_project_snapshot.await;
983 this.read_with(cx, |this, cx| SerializedThread {
984 version: SerializedThread::VERSION.to_string(),
985 summary: this.summary_or_default(),
986 updated_at: this.updated_at(),
987 messages: this
988 .messages()
989 .map(|message| SerializedMessage {
990 id: message.id,
991 role: message.role,
992 segments: message
993 .segments
994 .iter()
995 .map(|segment| match segment {
996 MessageSegment::Text(text) => {
997 SerializedMessageSegment::Text { text: text.clone() }
998 }
999 MessageSegment::Thinking { text, signature } => {
1000 SerializedMessageSegment::Thinking {
1001 text: text.clone(),
1002 signature: signature.clone(),
1003 }
1004 }
1005 MessageSegment::RedactedThinking(data) => {
1006 SerializedMessageSegment::RedactedThinking {
1007 data: data.clone(),
1008 }
1009 }
1010 })
1011 .collect(),
1012 tool_uses: this
1013 .tool_uses_for_message(message.id, cx)
1014 .into_iter()
1015 .map(|tool_use| SerializedToolUse {
1016 id: tool_use.id,
1017 name: tool_use.name,
1018 input: tool_use.input,
1019 })
1020 .collect(),
1021 tool_results: this
1022 .tool_results_for_message(message.id)
1023 .into_iter()
1024 .map(|tool_result| SerializedToolResult {
1025 tool_use_id: tool_result.tool_use_id.clone(),
1026 is_error: tool_result.is_error,
1027 content: tool_result.content.clone(),
1028 })
1029 .collect(),
1030 context: message.loaded_context.text.clone(),
1031 creases: message
1032 .creases
1033 .iter()
1034 .map(|crease| SerializedCrease {
1035 start: crease.range.start,
1036 end: crease.range.end,
1037 icon_path: crease.metadata.icon_path.clone(),
1038 label: crease.metadata.label.clone(),
1039 })
1040 .collect(),
1041 })
1042 .collect(),
1043 initial_project_snapshot,
1044 cumulative_token_usage: this.cumulative_token_usage,
1045 request_token_usage: this.request_token_usage.clone(),
1046 detailed_summary_state: this.detailed_summary_rx.borrow().clone(),
1047 exceeded_window_error: this.exceeded_window_error.clone(),
1048 model: this
1049 .configured_model
1050 .as_ref()
1051 .map(|model| SerializedLanguageModel {
1052 provider: model.provider.id().0.to_string(),
1053 model: model.model.id().0.to_string(),
1054 }),
1055 })
1056 })
1057 }
1058
1059 pub fn remaining_turns(&self) -> u32 {
1060 self.remaining_turns
1061 }
1062
1063 pub fn set_remaining_turns(&mut self, remaining_turns: u32) {
1064 self.remaining_turns = remaining_turns;
1065 }
1066
1067 pub fn send_to_model(
1068 &mut self,
1069 model: Arc<dyn LanguageModel>,
1070 window: Option<AnyWindowHandle>,
1071 cx: &mut Context<Self>,
1072 ) {
1073 if self.remaining_turns == 0 {
1074 return;
1075 }
1076
1077 self.remaining_turns -= 1;
1078
1079 let request = self.to_completion_request(model.clone(), cx);
1080
1081 self.stream_completion(request, model, window, cx);
1082 }
1083
1084 pub fn used_tools_since_last_user_message(&self) -> bool {
1085 for message in self.messages.iter().rev() {
1086 if self.tool_use.message_has_tool_results(message.id) {
1087 return true;
1088 } else if message.role == Role::User {
1089 return false;
1090 }
1091 }
1092
1093 false
1094 }
1095
1096 pub fn to_completion_request(
1097 &self,
1098 model: Arc<dyn LanguageModel>,
1099 cx: &mut Context<Self>,
1100 ) -> LanguageModelRequest {
1101 let mut request = LanguageModelRequest {
1102 thread_id: Some(self.id.to_string()),
1103 prompt_id: Some(self.last_prompt_id.to_string()),
1104 mode: None,
1105 messages: vec![],
1106 tools: Vec::new(),
1107 stop: Vec::new(),
1108 temperature: None,
1109 };
1110
1111 let available_tools = self.available_tools(cx, model.clone());
1112 let available_tool_names = available_tools
1113 .iter()
1114 .map(|tool| tool.name.clone())
1115 .collect();
1116
1117 let model_context = &ModelContext {
1118 available_tools: available_tool_names,
1119 };
1120
1121 if let Some(project_context) = self.project_context.borrow().as_ref() {
1122 match self
1123 .prompt_builder
1124 .generate_assistant_system_prompt(project_context, model_context)
1125 {
1126 Err(err) => {
1127 let message = format!("{err:?}").into();
1128 log::error!("{message}");
1129 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1130 header: "Error generating system prompt".into(),
1131 message,
1132 }));
1133 }
1134 Ok(system_prompt) => {
1135 request.messages.push(LanguageModelRequestMessage {
1136 role: Role::System,
1137 content: vec![MessageContent::Text(system_prompt)],
1138 cache: true,
1139 });
1140 }
1141 }
1142 } else {
1143 let message = "Context for system prompt unexpectedly not ready.".into();
1144 log::error!("{message}");
1145 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1146 header: "Error generating system prompt".into(),
1147 message,
1148 }));
1149 }
1150
1151 for message in &self.messages {
1152 let mut request_message = LanguageModelRequestMessage {
1153 role: message.role,
1154 content: Vec::new(),
1155 cache: false,
1156 };
1157
1158 message
1159 .loaded_context
1160 .add_to_request_message(&mut request_message);
1161
1162 for segment in &message.segments {
1163 match segment {
1164 MessageSegment::Text(text) => {
1165 if !text.is_empty() {
1166 request_message
1167 .content
1168 .push(MessageContent::Text(text.into()));
1169 }
1170 }
1171 MessageSegment::Thinking { text, signature } => {
1172 if !text.is_empty() {
1173 request_message.content.push(MessageContent::Thinking {
1174 text: text.into(),
1175 signature: signature.clone(),
1176 });
1177 }
1178 }
1179 MessageSegment::RedactedThinking(data) => {
1180 request_message
1181 .content
1182 .push(MessageContent::RedactedThinking(data.clone()));
1183 }
1184 };
1185 }
1186
1187 self.tool_use
1188 .attach_tool_uses(message.id, &mut request_message);
1189
1190 request.messages.push(request_message);
1191
1192 if let Some(tool_results_message) = self.tool_use.tool_results_message(message.id) {
1193 request.messages.push(tool_results_message);
1194 }
1195 }
1196
1197 // https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
1198 if let Some(last) = request.messages.last_mut() {
1199 last.cache = true;
1200 }
1201
1202 self.attached_tracked_files_state(&mut request.messages, cx);
1203
1204 request.tools = available_tools;
1205 request.mode = if model.supports_max_mode() {
1206 Some(self.completion_mode)
1207 } else {
1208 Some(CompletionMode::Normal)
1209 };
1210
1211 request
1212 }
1213
1214 fn to_summarize_request(&self, added_user_message: String) -> LanguageModelRequest {
1215 let mut request = LanguageModelRequest {
1216 thread_id: None,
1217 prompt_id: None,
1218 mode: None,
1219 messages: vec![],
1220 tools: Vec::new(),
1221 stop: Vec::new(),
1222 temperature: None,
1223 };
1224
1225 for message in &self.messages {
1226 let mut request_message = LanguageModelRequestMessage {
1227 role: message.role,
1228 content: Vec::new(),
1229 cache: false,
1230 };
1231
1232 for segment in &message.segments {
1233 match segment {
1234 MessageSegment::Text(text) => request_message
1235 .content
1236 .push(MessageContent::Text(text.clone())),
1237 MessageSegment::Thinking { .. } => {}
1238 MessageSegment::RedactedThinking(_) => {}
1239 }
1240 }
1241
1242 if request_message.content.is_empty() {
1243 continue;
1244 }
1245
1246 request.messages.push(request_message);
1247 }
1248
1249 request.messages.push(LanguageModelRequestMessage {
1250 role: Role::User,
1251 content: vec![MessageContent::Text(added_user_message)],
1252 cache: false,
1253 });
1254
1255 request
1256 }
1257
1258 fn attached_tracked_files_state(
1259 &self,
1260 messages: &mut Vec<LanguageModelRequestMessage>,
1261 cx: &App,
1262 ) {
1263 const STALE_FILES_HEADER: &str = "These files changed since last read:";
1264
1265 let mut stale_message = String::new();
1266
1267 let action_log = self.action_log.read(cx);
1268
1269 for stale_file in action_log.stale_buffers(cx) {
1270 let Some(file) = stale_file.read(cx).file() else {
1271 continue;
1272 };
1273
1274 if stale_message.is_empty() {
1275 write!(&mut stale_message, "{}\n", STALE_FILES_HEADER).ok();
1276 }
1277
1278 writeln!(&mut stale_message, "- {}", file.path().display()).ok();
1279 }
1280
1281 let mut content = Vec::with_capacity(2);
1282
1283 if !stale_message.is_empty() {
1284 content.push(stale_message.into());
1285 }
1286
1287 if !content.is_empty() {
1288 let context_message = LanguageModelRequestMessage {
1289 role: Role::User,
1290 content,
1291 cache: false,
1292 };
1293
1294 messages.push(context_message);
1295 }
1296 }
1297
1298 pub fn stream_completion(
1299 &mut self,
1300 request: LanguageModelRequest,
1301 model: Arc<dyn LanguageModel>,
1302 window: Option<AnyWindowHandle>,
1303 cx: &mut Context<Self>,
1304 ) {
1305 let pending_completion_id = post_inc(&mut self.completion_count);
1306 let mut request_callback_parameters = if self.request_callback.is_some() {
1307 Some((request.clone(), Vec::new()))
1308 } else {
1309 None
1310 };
1311 let prompt_id = self.last_prompt_id.clone();
1312 let tool_use_metadata = ToolUseMetadata {
1313 model: model.clone(),
1314 thread_id: self.id.clone(),
1315 prompt_id: prompt_id.clone(),
1316 };
1317
1318 let task = cx.spawn(async move |thread, cx| {
1319 let stream_completion_future = model.stream_completion_with_usage(request, &cx);
1320 let initial_token_usage =
1321 thread.read_with(cx, |thread, _cx| thread.cumulative_token_usage);
1322 let stream_completion = async {
1323 let (mut events, usage) = stream_completion_future.await?;
1324
1325 let mut stop_reason = StopReason::EndTurn;
1326 let mut current_token_usage = TokenUsage::default();
1327
1328 if let Some(usage) = usage {
1329 thread
1330 .update(cx, |_thread, cx| {
1331 cx.emit(ThreadEvent::UsageUpdated(usage));
1332 })
1333 .ok();
1334 }
1335
1336 let mut request_assistant_message_id = None;
1337
1338 while let Some(event) = events.next().await {
1339 if let Some((_, response_events)) = request_callback_parameters.as_mut() {
1340 response_events
1341 .push(event.as_ref().map_err(|error| error.to_string()).cloned());
1342 }
1343
1344 thread.update(cx, |thread, cx| {
1345 let event = match event {
1346 Ok(event) => event,
1347 Err(LanguageModelCompletionError::BadInputJson {
1348 id,
1349 tool_name,
1350 raw_input: invalid_input_json,
1351 json_parse_error,
1352 }) => {
1353 thread.receive_invalid_tool_json(
1354 id,
1355 tool_name,
1356 invalid_input_json,
1357 json_parse_error,
1358 window,
1359 cx,
1360 );
1361 return Ok(());
1362 }
1363 Err(LanguageModelCompletionError::Other(error)) => {
1364 return Err(error);
1365 }
1366 };
1367
1368 match event {
1369 LanguageModelCompletionEvent::StartMessage { .. } => {
1370 request_assistant_message_id =
1371 Some(thread.insert_assistant_message(
1372 vec![MessageSegment::Text(String::new())],
1373 cx,
1374 ));
1375 }
1376 LanguageModelCompletionEvent::Stop(reason) => {
1377 stop_reason = reason;
1378 }
1379 LanguageModelCompletionEvent::UsageUpdate(token_usage) => {
1380 thread.update_token_usage_at_last_message(token_usage);
1381 thread.cumulative_token_usage = thread.cumulative_token_usage
1382 + token_usage
1383 - current_token_usage;
1384 current_token_usage = token_usage;
1385 }
1386 LanguageModelCompletionEvent::Text(chunk) => {
1387 cx.emit(ThreadEvent::ReceivedTextChunk);
1388 if let Some(last_message) = thread.messages.last_mut() {
1389 if last_message.role == Role::Assistant
1390 && !thread.tool_use.has_tool_results(last_message.id)
1391 {
1392 last_message.push_text(&chunk);
1393 cx.emit(ThreadEvent::StreamedAssistantText(
1394 last_message.id,
1395 chunk,
1396 ));
1397 } else {
1398 // If we won't have an Assistant message yet, assume this chunk marks the beginning
1399 // of a new Assistant response.
1400 //
1401 // Importantly: We do *not* want to emit a `StreamedAssistantText` event here, as it
1402 // will result in duplicating the text of the chunk in the rendered Markdown.
1403 request_assistant_message_id =
1404 Some(thread.insert_assistant_message(
1405 vec![MessageSegment::Text(chunk.to_string())],
1406 cx,
1407 ));
1408 };
1409 }
1410 }
1411 LanguageModelCompletionEvent::Thinking {
1412 text: chunk,
1413 signature,
1414 } => {
1415 if let Some(last_message) = thread.messages.last_mut() {
1416 if last_message.role == Role::Assistant
1417 && !thread.tool_use.has_tool_results(last_message.id)
1418 {
1419 last_message.push_thinking(&chunk, signature);
1420 cx.emit(ThreadEvent::StreamedAssistantThinking(
1421 last_message.id,
1422 chunk,
1423 ));
1424 } else {
1425 // If we won't have an Assistant message yet, assume this chunk marks the beginning
1426 // of a new Assistant response.
1427 //
1428 // Importantly: We do *not* want to emit a `StreamedAssistantText` event here, as it
1429 // will result in duplicating the text of the chunk in the rendered Markdown.
1430 request_assistant_message_id =
1431 Some(thread.insert_assistant_message(
1432 vec![MessageSegment::Thinking {
1433 text: chunk.to_string(),
1434 signature,
1435 }],
1436 cx,
1437 ));
1438 };
1439 }
1440 }
1441 LanguageModelCompletionEvent::ToolUse(tool_use) => {
1442 let last_assistant_message_id = request_assistant_message_id
1443 .unwrap_or_else(|| {
1444 let new_assistant_message_id =
1445 thread.insert_assistant_message(vec![], cx);
1446 request_assistant_message_id =
1447 Some(new_assistant_message_id);
1448 new_assistant_message_id
1449 });
1450
1451 let tool_use_id = tool_use.id.clone();
1452 let streamed_input = if tool_use.is_input_complete {
1453 None
1454 } else {
1455 Some((&tool_use.input).clone())
1456 };
1457
1458 let ui_text = thread.tool_use.request_tool_use(
1459 last_assistant_message_id,
1460 tool_use,
1461 tool_use_metadata.clone(),
1462 cx,
1463 );
1464
1465 if let Some(input) = streamed_input {
1466 cx.emit(ThreadEvent::StreamedToolUse {
1467 tool_use_id,
1468 ui_text,
1469 input,
1470 });
1471 }
1472 }
1473 }
1474
1475 thread.touch_updated_at();
1476 cx.emit(ThreadEvent::StreamedCompletion);
1477 cx.notify();
1478
1479 thread.auto_capture_telemetry(cx);
1480 Ok(())
1481 })??;
1482
1483 smol::future::yield_now().await;
1484 }
1485
1486 thread.update(cx, |thread, cx| {
1487 thread
1488 .pending_completions
1489 .retain(|completion| completion.id != pending_completion_id);
1490
1491 // If there is a response without tool use, summarize the message. Otherwise,
1492 // allow two tool uses before summarizing.
1493 if thread.summary.is_none()
1494 && thread.messages.len() >= 2
1495 && (!thread.has_pending_tool_uses() || thread.messages.len() >= 6)
1496 {
1497 thread.summarize(cx);
1498 }
1499 })?;
1500
1501 anyhow::Ok(stop_reason)
1502 };
1503
1504 let result = stream_completion.await;
1505
1506 thread
1507 .update(cx, |thread, cx| {
1508 thread.finalize_pending_checkpoint(cx);
1509 match result.as_ref() {
1510 Ok(stop_reason) => match stop_reason {
1511 StopReason::ToolUse => {
1512 let tool_uses = thread.use_pending_tools(window, cx, model.clone());
1513 cx.emit(ThreadEvent::UsePendingTools { tool_uses });
1514 }
1515 StopReason::EndTurn => {}
1516 StopReason::MaxTokens => {}
1517 },
1518 Err(error) => {
1519 if error.is::<PaymentRequiredError>() {
1520 cx.emit(ThreadEvent::ShowError(ThreadError::PaymentRequired));
1521 } else if error.is::<MaxMonthlySpendReachedError>() {
1522 cx.emit(ThreadEvent::ShowError(
1523 ThreadError::MaxMonthlySpendReached,
1524 ));
1525 } else if let Some(error) =
1526 error.downcast_ref::<ModelRequestLimitReachedError>()
1527 {
1528 cx.emit(ThreadEvent::ShowError(
1529 ThreadError::ModelRequestLimitReached { plan: error.plan },
1530 ));
1531 } else if let Some(known_error) =
1532 error.downcast_ref::<LanguageModelKnownError>()
1533 {
1534 match known_error {
1535 LanguageModelKnownError::ContextWindowLimitExceeded {
1536 tokens,
1537 } => {
1538 thread.exceeded_window_error = Some(ExceededWindowError {
1539 model_id: model.id(),
1540 token_count: *tokens,
1541 });
1542 cx.notify();
1543 }
1544 }
1545 } else {
1546 let error_message = error
1547 .chain()
1548 .map(|err| err.to_string())
1549 .collect::<Vec<_>>()
1550 .join("\n");
1551 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1552 header: "Error interacting with language model".into(),
1553 message: SharedString::from(error_message.clone()),
1554 }));
1555 }
1556
1557 thread.cancel_last_completion(window, cx);
1558 }
1559 }
1560 cx.emit(ThreadEvent::Stopped(result.map_err(Arc::new)));
1561
1562 if let Some((request_callback, (request, response_events))) = thread
1563 .request_callback
1564 .as_mut()
1565 .zip(request_callback_parameters.as_ref())
1566 {
1567 request_callback(request, response_events);
1568 }
1569
1570 thread.auto_capture_telemetry(cx);
1571
1572 if let Ok(initial_usage) = initial_token_usage {
1573 let usage = thread.cumulative_token_usage - initial_usage;
1574
1575 telemetry::event!(
1576 "Assistant Thread Completion",
1577 thread_id = thread.id().to_string(),
1578 prompt_id = prompt_id,
1579 model = model.telemetry_id(),
1580 model_provider = model.provider_id().to_string(),
1581 input_tokens = usage.input_tokens,
1582 output_tokens = usage.output_tokens,
1583 cache_creation_input_tokens = usage.cache_creation_input_tokens,
1584 cache_read_input_tokens = usage.cache_read_input_tokens,
1585 );
1586 }
1587 })
1588 .ok();
1589 });
1590
1591 self.pending_completions.push(PendingCompletion {
1592 id: pending_completion_id,
1593 _task: task,
1594 });
1595 }
1596
1597 pub fn summarize(&mut self, cx: &mut Context<Self>) {
1598 let Some(model) = LanguageModelRegistry::read_global(cx).thread_summary_model() else {
1599 return;
1600 };
1601
1602 if !model.provider.is_authenticated(cx) {
1603 return;
1604 }
1605
1606 let added_user_message = "Generate a concise 3-7 word title for this conversation, omitting punctuation. \
1607 Go straight to the title, without any preamble and prefix like `Here's a concise suggestion:...` or `Title:`. \
1608 If the conversation is about a specific subject, include it in the title. \
1609 Be descriptive. DO NOT speak in the first person.";
1610
1611 let request = self.to_summarize_request(added_user_message.into());
1612
1613 self.pending_summary = cx.spawn(async move |this, cx| {
1614 async move {
1615 let stream = model.model.stream_completion_text_with_usage(request, &cx);
1616 let (mut messages, usage) = stream.await?;
1617
1618 if let Some(usage) = usage {
1619 this.update(cx, |_thread, cx| {
1620 cx.emit(ThreadEvent::UsageUpdated(usage));
1621 })
1622 .ok();
1623 }
1624
1625 let mut new_summary = String::new();
1626 while let Some(message) = messages.stream.next().await {
1627 let text = message?;
1628 let mut lines = text.lines();
1629 new_summary.extend(lines.next());
1630
1631 // Stop if the LLM generated multiple lines.
1632 if lines.next().is_some() {
1633 break;
1634 }
1635 }
1636
1637 this.update(cx, |this, cx| {
1638 if !new_summary.is_empty() {
1639 this.summary = Some(new_summary.into());
1640 }
1641
1642 cx.emit(ThreadEvent::SummaryGenerated);
1643 })?;
1644
1645 anyhow::Ok(())
1646 }
1647 .log_err()
1648 .await
1649 });
1650 }
1651
1652 pub fn start_generating_detailed_summary_if_needed(
1653 &mut self,
1654 thread_store: WeakEntity<ThreadStore>,
1655 cx: &mut Context<Self>,
1656 ) {
1657 let Some(last_message_id) = self.messages.last().map(|message| message.id) else {
1658 return;
1659 };
1660
1661 match &*self.detailed_summary_rx.borrow() {
1662 DetailedSummaryState::Generating { message_id, .. }
1663 | DetailedSummaryState::Generated { message_id, .. }
1664 if *message_id == last_message_id =>
1665 {
1666 // Already up-to-date
1667 return;
1668 }
1669 _ => {}
1670 }
1671
1672 let Some(ConfiguredModel { model, provider }) =
1673 LanguageModelRegistry::read_global(cx).thread_summary_model()
1674 else {
1675 return;
1676 };
1677
1678 if !provider.is_authenticated(cx) {
1679 return;
1680 }
1681
1682 let added_user_message = "Generate a detailed summary of this conversation. Include:\n\
1683 1. A brief overview of what was discussed\n\
1684 2. Key facts or information discovered\n\
1685 3. Outcomes or conclusions reached\n\
1686 4. Any action items or next steps if any\n\
1687 Format it in Markdown with headings and bullet points.";
1688
1689 let request = self.to_summarize_request(added_user_message.into());
1690
1691 *self.detailed_summary_tx.borrow_mut() = DetailedSummaryState::Generating {
1692 message_id: last_message_id,
1693 };
1694
1695 // Replace the detailed summarization task if there is one, cancelling it. It would probably
1696 // be better to allow the old task to complete, but this would require logic for choosing
1697 // which result to prefer (the old task could complete after the new one, resulting in a
1698 // stale summary).
1699 self.detailed_summary_task = cx.spawn(async move |thread, cx| {
1700 let stream = model.stream_completion_text(request, &cx);
1701 let Some(mut messages) = stream.await.log_err() else {
1702 thread
1703 .update(cx, |thread, _cx| {
1704 *thread.detailed_summary_tx.borrow_mut() =
1705 DetailedSummaryState::NotGenerated;
1706 })
1707 .ok()?;
1708 return None;
1709 };
1710
1711 let mut new_detailed_summary = String::new();
1712
1713 while let Some(chunk) = messages.stream.next().await {
1714 if let Some(chunk) = chunk.log_err() {
1715 new_detailed_summary.push_str(&chunk);
1716 }
1717 }
1718
1719 thread
1720 .update(cx, |thread, _cx| {
1721 *thread.detailed_summary_tx.borrow_mut() = DetailedSummaryState::Generated {
1722 text: new_detailed_summary.into(),
1723 message_id: last_message_id,
1724 };
1725 })
1726 .ok()?;
1727
1728 // Save thread so its summary can be reused later
1729 if let Some(thread) = thread.upgrade() {
1730 if let Ok(Ok(save_task)) = cx.update(|cx| {
1731 thread_store
1732 .update(cx, |thread_store, cx| thread_store.save_thread(&thread, cx))
1733 }) {
1734 save_task.await.log_err();
1735 }
1736 }
1737
1738 Some(())
1739 });
1740 }
1741
1742 pub async fn wait_for_detailed_summary_or_text(
1743 this: &Entity<Self>,
1744 cx: &mut AsyncApp,
1745 ) -> Option<SharedString> {
1746 let mut detailed_summary_rx = this
1747 .read_with(cx, |this, _cx| this.detailed_summary_rx.clone())
1748 .ok()?;
1749 loop {
1750 match detailed_summary_rx.recv().await? {
1751 DetailedSummaryState::Generating { .. } => {}
1752 DetailedSummaryState::NotGenerated => {
1753 return this.read_with(cx, |this, _cx| this.text().into()).ok();
1754 }
1755 DetailedSummaryState::Generated { text, .. } => return Some(text),
1756 }
1757 }
1758 }
1759
1760 pub fn latest_detailed_summary_or_text(&self) -> SharedString {
1761 self.detailed_summary_rx
1762 .borrow()
1763 .text()
1764 .unwrap_or_else(|| self.text().into())
1765 }
1766
1767 pub fn is_generating_detailed_summary(&self) -> bool {
1768 matches!(
1769 &*self.detailed_summary_rx.borrow(),
1770 DetailedSummaryState::Generating { .. }
1771 )
1772 }
1773
1774 pub fn use_pending_tools(
1775 &mut self,
1776 window: Option<AnyWindowHandle>,
1777 cx: &mut Context<Self>,
1778 model: Arc<dyn LanguageModel>,
1779 ) -> Vec<PendingToolUse> {
1780 self.auto_capture_telemetry(cx);
1781 let request = self.to_completion_request(model, cx);
1782 let messages = Arc::new(request.messages);
1783 let pending_tool_uses = self
1784 .tool_use
1785 .pending_tool_uses()
1786 .into_iter()
1787 .filter(|tool_use| tool_use.status.is_idle())
1788 .cloned()
1789 .collect::<Vec<_>>();
1790
1791 for tool_use in pending_tool_uses.iter() {
1792 if let Some(tool) = self.tools.read(cx).tool(&tool_use.name, cx) {
1793 if tool.needs_confirmation(&tool_use.input, cx)
1794 && !AssistantSettings::get_global(cx).always_allow_tool_actions
1795 {
1796 self.tool_use.confirm_tool_use(
1797 tool_use.id.clone(),
1798 tool_use.ui_text.clone(),
1799 tool_use.input.clone(),
1800 messages.clone(),
1801 tool,
1802 );
1803 cx.emit(ThreadEvent::ToolConfirmationNeeded);
1804 } else {
1805 self.run_tool(
1806 tool_use.id.clone(),
1807 tool_use.ui_text.clone(),
1808 tool_use.input.clone(),
1809 &messages,
1810 tool,
1811 window,
1812 cx,
1813 );
1814 }
1815 }
1816 }
1817
1818 pending_tool_uses
1819 }
1820
1821 pub fn receive_invalid_tool_json(
1822 &mut self,
1823 tool_use_id: LanguageModelToolUseId,
1824 tool_name: Arc<str>,
1825 invalid_json: Arc<str>,
1826 error: String,
1827 window: Option<AnyWindowHandle>,
1828 cx: &mut Context<Thread>,
1829 ) {
1830 log::error!("The model returned invalid input JSON: {invalid_json}");
1831
1832 let pending_tool_use = self.tool_use.insert_tool_output(
1833 tool_use_id.clone(),
1834 tool_name,
1835 Err(anyhow!("Error parsing input JSON: {error}")),
1836 self.configured_model.as_ref(),
1837 );
1838 let ui_text = if let Some(pending_tool_use) = &pending_tool_use {
1839 pending_tool_use.ui_text.clone()
1840 } else {
1841 log::error!(
1842 "There was no pending tool use for tool use {tool_use_id}, even though it finished (with invalid input JSON)."
1843 );
1844 format!("Unknown tool {}", tool_use_id).into()
1845 };
1846
1847 cx.emit(ThreadEvent::InvalidToolInput {
1848 tool_use_id: tool_use_id.clone(),
1849 ui_text,
1850 invalid_input_json: invalid_json,
1851 });
1852
1853 self.tool_finished(tool_use_id, pending_tool_use, false, window, cx);
1854 }
1855
1856 pub fn run_tool(
1857 &mut self,
1858 tool_use_id: LanguageModelToolUseId,
1859 ui_text: impl Into<SharedString>,
1860 input: serde_json::Value,
1861 messages: &[LanguageModelRequestMessage],
1862 tool: Arc<dyn Tool>,
1863 window: Option<AnyWindowHandle>,
1864 cx: &mut Context<Thread>,
1865 ) {
1866 let task = self.spawn_tool_use(tool_use_id.clone(), messages, input, tool, window, cx);
1867 self.tool_use
1868 .run_pending_tool(tool_use_id, ui_text.into(), task);
1869 }
1870
1871 fn spawn_tool_use(
1872 &mut self,
1873 tool_use_id: LanguageModelToolUseId,
1874 messages: &[LanguageModelRequestMessage],
1875 input: serde_json::Value,
1876 tool: Arc<dyn Tool>,
1877 window: Option<AnyWindowHandle>,
1878 cx: &mut Context<Thread>,
1879 ) -> Task<()> {
1880 let tool_name: Arc<str> = tool.name().into();
1881
1882 let tool_result = if self.tools.read(cx).is_disabled(&tool.source(), &tool_name) {
1883 Task::ready(Err(anyhow!("tool is disabled: {tool_name}"))).into()
1884 } else {
1885 tool.run(
1886 input,
1887 messages,
1888 self.project.clone(),
1889 self.action_log.clone(),
1890 window,
1891 cx,
1892 )
1893 };
1894
1895 // Store the card separately if it exists
1896 if let Some(card) = tool_result.card.clone() {
1897 self.tool_use
1898 .insert_tool_result_card(tool_use_id.clone(), card);
1899 }
1900
1901 cx.spawn({
1902 async move |thread: WeakEntity<Thread>, cx| {
1903 let output = tool_result.output.await;
1904
1905 thread
1906 .update(cx, |thread, cx| {
1907 let pending_tool_use = thread.tool_use.insert_tool_output(
1908 tool_use_id.clone(),
1909 tool_name,
1910 output,
1911 thread.configured_model.as_ref(),
1912 );
1913 thread.tool_finished(tool_use_id, pending_tool_use, false, window, cx);
1914 })
1915 .ok();
1916 }
1917 })
1918 }
1919
1920 fn tool_finished(
1921 &mut self,
1922 tool_use_id: LanguageModelToolUseId,
1923 pending_tool_use: Option<PendingToolUse>,
1924 canceled: bool,
1925 window: Option<AnyWindowHandle>,
1926 cx: &mut Context<Self>,
1927 ) {
1928 if self.all_tools_finished() {
1929 if let Some(ConfiguredModel { model, .. }) = self.configured_model.as_ref() {
1930 if !canceled {
1931 self.send_to_model(model.clone(), window, cx);
1932 }
1933 self.auto_capture_telemetry(cx);
1934 }
1935 }
1936
1937 cx.emit(ThreadEvent::ToolFinished {
1938 tool_use_id,
1939 pending_tool_use,
1940 });
1941 }
1942
1943 /// Cancels the last pending completion, if there are any pending.
1944 ///
1945 /// Returns whether a completion was canceled.
1946 pub fn cancel_last_completion(
1947 &mut self,
1948 window: Option<AnyWindowHandle>,
1949 cx: &mut Context<Self>,
1950 ) -> bool {
1951 let mut canceled = self.pending_completions.pop().is_some();
1952
1953 for pending_tool_use in self.tool_use.cancel_pending() {
1954 canceled = true;
1955 self.tool_finished(
1956 pending_tool_use.id.clone(),
1957 Some(pending_tool_use),
1958 true,
1959 window,
1960 cx,
1961 );
1962 }
1963
1964 self.finalize_pending_checkpoint(cx);
1965 canceled
1966 }
1967
1968 /// Signals that any in-progress editing should be canceled.
1969 ///
1970 /// This method is used to notify listeners (like ActiveThread) that
1971 /// they should cancel any editing operations.
1972 pub fn cancel_editing(&mut self, cx: &mut Context<Self>) {
1973 cx.emit(ThreadEvent::CancelEditing);
1974 }
1975
1976 pub fn feedback(&self) -> Option<ThreadFeedback> {
1977 self.feedback
1978 }
1979
1980 pub fn message_feedback(&self, message_id: MessageId) -> Option<ThreadFeedback> {
1981 self.message_feedback.get(&message_id).copied()
1982 }
1983
1984 pub fn report_message_feedback(
1985 &mut self,
1986 message_id: MessageId,
1987 feedback: ThreadFeedback,
1988 cx: &mut Context<Self>,
1989 ) -> Task<Result<()>> {
1990 if self.message_feedback.get(&message_id) == Some(&feedback) {
1991 return Task::ready(Ok(()));
1992 }
1993
1994 let final_project_snapshot = Self::project_snapshot(self.project.clone(), cx);
1995 let serialized_thread = self.serialize(cx);
1996 let thread_id = self.id().clone();
1997 let client = self.project.read(cx).client();
1998
1999 let enabled_tool_names: Vec<String> = self
2000 .tools()
2001 .read(cx)
2002 .enabled_tools(cx)
2003 .iter()
2004 .map(|tool| tool.name().to_string())
2005 .collect();
2006
2007 self.message_feedback.insert(message_id, feedback);
2008
2009 cx.notify();
2010
2011 let message_content = self
2012 .message(message_id)
2013 .map(|msg| msg.to_string())
2014 .unwrap_or_default();
2015
2016 cx.background_spawn(async move {
2017 let final_project_snapshot = final_project_snapshot.await;
2018 let serialized_thread = serialized_thread.await?;
2019 let thread_data =
2020 serde_json::to_value(serialized_thread).unwrap_or_else(|_| serde_json::Value::Null);
2021
2022 let rating = match feedback {
2023 ThreadFeedback::Positive => "positive",
2024 ThreadFeedback::Negative => "negative",
2025 };
2026 telemetry::event!(
2027 "Assistant Thread Rated",
2028 rating,
2029 thread_id,
2030 enabled_tool_names,
2031 message_id = message_id.0,
2032 message_content,
2033 thread_data,
2034 final_project_snapshot
2035 );
2036 client.telemetry().flush_events().await;
2037
2038 Ok(())
2039 })
2040 }
2041
2042 pub fn report_feedback(
2043 &mut self,
2044 feedback: ThreadFeedback,
2045 cx: &mut Context<Self>,
2046 ) -> Task<Result<()>> {
2047 let last_assistant_message_id = self
2048 .messages
2049 .iter()
2050 .rev()
2051 .find(|msg| msg.role == Role::Assistant)
2052 .map(|msg| msg.id);
2053
2054 if let Some(message_id) = last_assistant_message_id {
2055 self.report_message_feedback(message_id, feedback, cx)
2056 } else {
2057 let final_project_snapshot = Self::project_snapshot(self.project.clone(), cx);
2058 let serialized_thread = self.serialize(cx);
2059 let thread_id = self.id().clone();
2060 let client = self.project.read(cx).client();
2061 self.feedback = Some(feedback);
2062 cx.notify();
2063
2064 cx.background_spawn(async move {
2065 let final_project_snapshot = final_project_snapshot.await;
2066 let serialized_thread = serialized_thread.await?;
2067 let thread_data = serde_json::to_value(serialized_thread)
2068 .unwrap_or_else(|_| serde_json::Value::Null);
2069
2070 let rating = match feedback {
2071 ThreadFeedback::Positive => "positive",
2072 ThreadFeedback::Negative => "negative",
2073 };
2074 telemetry::event!(
2075 "Assistant Thread Rated",
2076 rating,
2077 thread_id,
2078 thread_data,
2079 final_project_snapshot
2080 );
2081 client.telemetry().flush_events().await;
2082
2083 Ok(())
2084 })
2085 }
2086 }
2087
2088 /// Create a snapshot of the current project state including git information and unsaved buffers.
2089 fn project_snapshot(
2090 project: Entity<Project>,
2091 cx: &mut Context<Self>,
2092 ) -> Task<Arc<ProjectSnapshot>> {
2093 let git_store = project.read(cx).git_store().clone();
2094 let worktree_snapshots: Vec<_> = project
2095 .read(cx)
2096 .visible_worktrees(cx)
2097 .map(|worktree| Self::worktree_snapshot(worktree, git_store.clone(), cx))
2098 .collect();
2099
2100 cx.spawn(async move |_, cx| {
2101 let worktree_snapshots = futures::future::join_all(worktree_snapshots).await;
2102
2103 let mut unsaved_buffers = Vec::new();
2104 cx.update(|app_cx| {
2105 let buffer_store = project.read(app_cx).buffer_store();
2106 for buffer_handle in buffer_store.read(app_cx).buffers() {
2107 let buffer = buffer_handle.read(app_cx);
2108 if buffer.is_dirty() {
2109 if let Some(file) = buffer.file() {
2110 let path = file.path().to_string_lossy().to_string();
2111 unsaved_buffers.push(path);
2112 }
2113 }
2114 }
2115 })
2116 .ok();
2117
2118 Arc::new(ProjectSnapshot {
2119 worktree_snapshots,
2120 unsaved_buffer_paths: unsaved_buffers,
2121 timestamp: Utc::now(),
2122 })
2123 })
2124 }
2125
2126 fn worktree_snapshot(
2127 worktree: Entity<project::Worktree>,
2128 git_store: Entity<GitStore>,
2129 cx: &App,
2130 ) -> Task<WorktreeSnapshot> {
2131 cx.spawn(async move |cx| {
2132 // Get worktree path and snapshot
2133 let worktree_info = cx.update(|app_cx| {
2134 let worktree = worktree.read(app_cx);
2135 let path = worktree.abs_path().to_string_lossy().to_string();
2136 let snapshot = worktree.snapshot();
2137 (path, snapshot)
2138 });
2139
2140 let Ok((worktree_path, _snapshot)) = worktree_info else {
2141 return WorktreeSnapshot {
2142 worktree_path: String::new(),
2143 git_state: None,
2144 };
2145 };
2146
2147 let git_state = git_store
2148 .update(cx, |git_store, cx| {
2149 git_store
2150 .repositories()
2151 .values()
2152 .find(|repo| {
2153 repo.read(cx)
2154 .abs_path_to_repo_path(&worktree.read(cx).abs_path())
2155 .is_some()
2156 })
2157 .cloned()
2158 })
2159 .ok()
2160 .flatten()
2161 .map(|repo| {
2162 repo.update(cx, |repo, _| {
2163 let current_branch =
2164 repo.branch.as_ref().map(|branch| branch.name().to_owned());
2165 repo.send_job(None, |state, _| async move {
2166 let RepositoryState::Local { backend, .. } = state else {
2167 return GitState {
2168 remote_url: None,
2169 head_sha: None,
2170 current_branch,
2171 diff: None,
2172 };
2173 };
2174
2175 let remote_url = backend.remote_url("origin");
2176 let head_sha = backend.head_sha().await;
2177 let diff = backend.diff(DiffType::HeadToWorktree).await.ok();
2178
2179 GitState {
2180 remote_url,
2181 head_sha,
2182 current_branch,
2183 diff,
2184 }
2185 })
2186 })
2187 });
2188
2189 let git_state = match git_state {
2190 Some(git_state) => match git_state.ok() {
2191 Some(git_state) => git_state.await.ok(),
2192 None => None,
2193 },
2194 None => None,
2195 };
2196
2197 WorktreeSnapshot {
2198 worktree_path,
2199 git_state,
2200 }
2201 })
2202 }
2203
2204 pub fn to_markdown(&self, cx: &App) -> Result<String> {
2205 let mut markdown = Vec::new();
2206
2207 if let Some(summary) = self.summary() {
2208 writeln!(markdown, "# {summary}\n")?;
2209 };
2210
2211 for message in self.messages() {
2212 writeln!(
2213 markdown,
2214 "## {role}\n",
2215 role = match message.role {
2216 Role::User => "User",
2217 Role::Assistant => "Assistant",
2218 Role::System => "System",
2219 }
2220 )?;
2221
2222 if !message.loaded_context.text.is_empty() {
2223 writeln!(markdown, "{}", message.loaded_context.text)?;
2224 }
2225
2226 if !message.loaded_context.images.is_empty() {
2227 writeln!(
2228 markdown,
2229 "\n{} images attached as context.\n",
2230 message.loaded_context.images.len()
2231 )?;
2232 }
2233
2234 for segment in &message.segments {
2235 match segment {
2236 MessageSegment::Text(text) => writeln!(markdown, "{}\n", text)?,
2237 MessageSegment::Thinking { text, .. } => {
2238 writeln!(markdown, "<think>\n{}\n</think>\n", text)?
2239 }
2240 MessageSegment::RedactedThinking(_) => {}
2241 }
2242 }
2243
2244 for tool_use in self.tool_uses_for_message(message.id, cx) {
2245 writeln!(
2246 markdown,
2247 "**Use Tool: {} ({})**",
2248 tool_use.name, tool_use.id
2249 )?;
2250 writeln!(markdown, "```json")?;
2251 writeln!(
2252 markdown,
2253 "{}",
2254 serde_json::to_string_pretty(&tool_use.input)?
2255 )?;
2256 writeln!(markdown, "```")?;
2257 }
2258
2259 for tool_result in self.tool_results_for_message(message.id) {
2260 write!(markdown, "\n**Tool Results: {}", tool_result.tool_use_id)?;
2261 if tool_result.is_error {
2262 write!(markdown, " (Error)")?;
2263 }
2264
2265 writeln!(markdown, "**\n")?;
2266 writeln!(markdown, "{}", tool_result.content)?;
2267 }
2268 }
2269
2270 Ok(String::from_utf8_lossy(&markdown).to_string())
2271 }
2272
2273 pub fn keep_edits_in_range(
2274 &mut self,
2275 buffer: Entity<language::Buffer>,
2276 buffer_range: Range<language::Anchor>,
2277 cx: &mut Context<Self>,
2278 ) {
2279 self.action_log.update(cx, |action_log, cx| {
2280 action_log.keep_edits_in_range(buffer, buffer_range, cx)
2281 });
2282 }
2283
2284 pub fn keep_all_edits(&mut self, cx: &mut Context<Self>) {
2285 self.action_log
2286 .update(cx, |action_log, cx| action_log.keep_all_edits(cx));
2287 }
2288
2289 pub fn reject_edits_in_ranges(
2290 &mut self,
2291 buffer: Entity<language::Buffer>,
2292 buffer_ranges: Vec<Range<language::Anchor>>,
2293 cx: &mut Context<Self>,
2294 ) -> Task<Result<()>> {
2295 self.action_log.update(cx, |action_log, cx| {
2296 action_log.reject_edits_in_ranges(buffer, buffer_ranges, cx)
2297 })
2298 }
2299
2300 pub fn action_log(&self) -> &Entity<ActionLog> {
2301 &self.action_log
2302 }
2303
2304 pub fn project(&self) -> &Entity<Project> {
2305 &self.project
2306 }
2307
2308 pub fn auto_capture_telemetry(&mut self, cx: &mut Context<Self>) {
2309 if !cx.has_flag::<feature_flags::ThreadAutoCaptureFeatureFlag>() {
2310 return;
2311 }
2312
2313 let now = Instant::now();
2314 if let Some(last) = self.last_auto_capture_at {
2315 if now.duration_since(last).as_secs() < 10 {
2316 return;
2317 }
2318 }
2319
2320 self.last_auto_capture_at = Some(now);
2321
2322 let thread_id = self.id().clone();
2323 let github_login = self
2324 .project
2325 .read(cx)
2326 .user_store()
2327 .read(cx)
2328 .current_user()
2329 .map(|user| user.github_login.clone());
2330 let client = self.project.read(cx).client().clone();
2331 let serialize_task = self.serialize(cx);
2332
2333 cx.background_executor()
2334 .spawn(async move {
2335 if let Ok(serialized_thread) = serialize_task.await {
2336 if let Ok(thread_data) = serde_json::to_value(serialized_thread) {
2337 telemetry::event!(
2338 "Agent Thread Auto-Captured",
2339 thread_id = thread_id.to_string(),
2340 thread_data = thread_data,
2341 auto_capture_reason = "tracked_user",
2342 github_login = github_login
2343 );
2344
2345 client.telemetry().flush_events().await;
2346 }
2347 }
2348 })
2349 .detach();
2350 }
2351
2352 pub fn cumulative_token_usage(&self) -> TokenUsage {
2353 self.cumulative_token_usage
2354 }
2355
2356 pub fn token_usage_up_to_message(&self, message_id: MessageId) -> TotalTokenUsage {
2357 let Some(model) = self.configured_model.as_ref() else {
2358 return TotalTokenUsage::default();
2359 };
2360
2361 let max = model.model.max_token_count();
2362
2363 let index = self
2364 .messages
2365 .iter()
2366 .position(|msg| msg.id == message_id)
2367 .unwrap_or(0);
2368
2369 if index == 0 {
2370 return TotalTokenUsage { total: 0, max };
2371 }
2372
2373 let token_usage = &self
2374 .request_token_usage
2375 .get(index - 1)
2376 .cloned()
2377 .unwrap_or_default();
2378
2379 TotalTokenUsage {
2380 total: token_usage.total_tokens() as usize,
2381 max,
2382 }
2383 }
2384
2385 pub fn total_token_usage(&self) -> Option<TotalTokenUsage> {
2386 let model = self.configured_model.as_ref()?;
2387
2388 let max = model.model.max_token_count();
2389
2390 if let Some(exceeded_error) = &self.exceeded_window_error {
2391 if model.model.id() == exceeded_error.model_id {
2392 return Some(TotalTokenUsage {
2393 total: exceeded_error.token_count,
2394 max,
2395 });
2396 }
2397 }
2398
2399 let total = self
2400 .token_usage_at_last_message()
2401 .unwrap_or_default()
2402 .total_tokens() as usize;
2403
2404 Some(TotalTokenUsage { total, max })
2405 }
2406
2407 fn token_usage_at_last_message(&self) -> Option<TokenUsage> {
2408 self.request_token_usage
2409 .get(self.messages.len().saturating_sub(1))
2410 .or_else(|| self.request_token_usage.last())
2411 .cloned()
2412 }
2413
2414 fn update_token_usage_at_last_message(&mut self, token_usage: TokenUsage) {
2415 let placeholder = self.token_usage_at_last_message().unwrap_or_default();
2416 self.request_token_usage
2417 .resize(self.messages.len(), placeholder);
2418
2419 if let Some(last) = self.request_token_usage.last_mut() {
2420 *last = token_usage;
2421 }
2422 }
2423
2424 pub fn deny_tool_use(
2425 &mut self,
2426 tool_use_id: LanguageModelToolUseId,
2427 tool_name: Arc<str>,
2428 window: Option<AnyWindowHandle>,
2429 cx: &mut Context<Self>,
2430 ) {
2431 let err = Err(anyhow::anyhow!(
2432 "Permission to run tool action denied by user"
2433 ));
2434
2435 self.tool_use.insert_tool_output(
2436 tool_use_id.clone(),
2437 tool_name,
2438 err,
2439 self.configured_model.as_ref(),
2440 );
2441 self.tool_finished(tool_use_id.clone(), None, true, window, cx);
2442 }
2443}
2444
2445#[derive(Debug, Clone, Error)]
2446pub enum ThreadError {
2447 #[error("Payment required")]
2448 PaymentRequired,
2449 #[error("Max monthly spend reached")]
2450 MaxMonthlySpendReached,
2451 #[error("Model request limit reached")]
2452 ModelRequestLimitReached { plan: Plan },
2453 #[error("Message {header}: {message}")]
2454 Message {
2455 header: SharedString,
2456 message: SharedString,
2457 },
2458}
2459
2460#[derive(Debug, Clone)]
2461pub enum ThreadEvent {
2462 ShowError(ThreadError),
2463 UsageUpdated(RequestUsage),
2464 StreamedCompletion,
2465 ReceivedTextChunk,
2466 StreamedAssistantText(MessageId, String),
2467 StreamedAssistantThinking(MessageId, String),
2468 StreamedToolUse {
2469 tool_use_id: LanguageModelToolUseId,
2470 ui_text: Arc<str>,
2471 input: serde_json::Value,
2472 },
2473 InvalidToolInput {
2474 tool_use_id: LanguageModelToolUseId,
2475 ui_text: Arc<str>,
2476 invalid_input_json: Arc<str>,
2477 },
2478 Stopped(Result<StopReason, Arc<anyhow::Error>>),
2479 MessageAdded(MessageId),
2480 MessageEdited(MessageId),
2481 MessageDeleted(MessageId),
2482 SummaryGenerated,
2483 SummaryChanged,
2484 UsePendingTools {
2485 tool_uses: Vec<PendingToolUse>,
2486 },
2487 ToolFinished {
2488 #[allow(unused)]
2489 tool_use_id: LanguageModelToolUseId,
2490 /// The pending tool use that corresponds to this tool.
2491 pending_tool_use: Option<PendingToolUse>,
2492 },
2493 CheckpointChanged,
2494 ToolConfirmationNeeded,
2495 CancelEditing,
2496}
2497
2498impl EventEmitter<ThreadEvent> for Thread {}
2499
2500struct PendingCompletion {
2501 id: usize,
2502 _task: Task<()>,
2503}
2504
2505#[cfg(test)]
2506mod tests {
2507 use super::*;
2508 use crate::{ThreadStore, context::load_context, context_store::ContextStore, thread_store};
2509 use assistant_settings::AssistantSettings;
2510 use assistant_tool::ToolRegistry;
2511 use context_server::ContextServerSettings;
2512 use editor::EditorSettings;
2513 use gpui::TestAppContext;
2514 use language_model::fake_provider::FakeLanguageModel;
2515 use project::{FakeFs, Project};
2516 use prompt_store::PromptBuilder;
2517 use serde_json::json;
2518 use settings::{Settings, SettingsStore};
2519 use std::sync::Arc;
2520 use theme::ThemeSettings;
2521 use util::path;
2522 use workspace::Workspace;
2523
2524 #[gpui::test]
2525 async fn test_message_with_context(cx: &mut TestAppContext) {
2526 init_test_settings(cx);
2527
2528 let project = create_test_project(
2529 cx,
2530 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2531 )
2532 .await;
2533
2534 let (_workspace, _thread_store, thread, context_store, model) =
2535 setup_test_environment(cx, project.clone()).await;
2536
2537 add_file_to_context(&project, &context_store, "test/code.rs", cx)
2538 .await
2539 .unwrap();
2540
2541 let context = context_store.update(cx, |store, _| store.context().next().cloned().unwrap());
2542 let loaded_context = cx
2543 .update(|cx| load_context(vec![context], &project, &None, cx))
2544 .await;
2545
2546 // Insert user message with context
2547 let message_id = thread.update(cx, |thread, cx| {
2548 thread.insert_user_message(
2549 "Please explain this code",
2550 loaded_context,
2551 None,
2552 Vec::new(),
2553 cx,
2554 )
2555 });
2556
2557 // Check content and context in message object
2558 let message = thread.read_with(cx, |thread, _| thread.message(message_id).unwrap().clone());
2559
2560 // Use different path format strings based on platform for the test
2561 #[cfg(windows)]
2562 let path_part = r"test\code.rs";
2563 #[cfg(not(windows))]
2564 let path_part = "test/code.rs";
2565
2566 let expected_context = format!(
2567 r#"
2568<context>
2569The following items were attached by the user. They are up-to-date and don't need to be re-read.
2570
2571<files>
2572```rs {path_part}
2573fn main() {{
2574 println!("Hello, world!");
2575}}
2576```
2577</files>
2578</context>
2579"#
2580 );
2581
2582 assert_eq!(message.role, Role::User);
2583 assert_eq!(message.segments.len(), 1);
2584 assert_eq!(
2585 message.segments[0],
2586 MessageSegment::Text("Please explain this code".to_string())
2587 );
2588 assert_eq!(message.loaded_context.text, expected_context);
2589
2590 // Check message in request
2591 let request = thread.update(cx, |thread, cx| {
2592 thread.to_completion_request(model.clone(), cx)
2593 });
2594
2595 assert_eq!(request.messages.len(), 2);
2596 let expected_full_message = format!("{}Please explain this code", expected_context);
2597 assert_eq!(request.messages[1].string_contents(), expected_full_message);
2598 }
2599
2600 #[gpui::test]
2601 async fn test_only_include_new_contexts(cx: &mut TestAppContext) {
2602 init_test_settings(cx);
2603
2604 let project = create_test_project(
2605 cx,
2606 json!({
2607 "file1.rs": "fn function1() {}\n",
2608 "file2.rs": "fn function2() {}\n",
2609 "file3.rs": "fn function3() {}\n",
2610 "file4.rs": "fn function4() {}\n",
2611 }),
2612 )
2613 .await;
2614
2615 let (_, _thread_store, thread, context_store, model) =
2616 setup_test_environment(cx, project.clone()).await;
2617
2618 // First message with context 1
2619 add_file_to_context(&project, &context_store, "test/file1.rs", cx)
2620 .await
2621 .unwrap();
2622 let new_contexts = context_store.update(cx, |store, cx| {
2623 store.new_context_for_thread(thread.read(cx), None)
2624 });
2625 assert_eq!(new_contexts.len(), 1);
2626 let loaded_context = cx
2627 .update(|cx| load_context(new_contexts, &project, &None, cx))
2628 .await;
2629 let message1_id = thread.update(cx, |thread, cx| {
2630 thread.insert_user_message("Message 1", loaded_context, None, Vec::new(), cx)
2631 });
2632
2633 // Second message with contexts 1 and 2 (context 1 should be skipped as it's already included)
2634 add_file_to_context(&project, &context_store, "test/file2.rs", cx)
2635 .await
2636 .unwrap();
2637 let new_contexts = context_store.update(cx, |store, cx| {
2638 store.new_context_for_thread(thread.read(cx), None)
2639 });
2640 assert_eq!(new_contexts.len(), 1);
2641 let loaded_context = cx
2642 .update(|cx| load_context(new_contexts, &project, &None, cx))
2643 .await;
2644 let message2_id = thread.update(cx, |thread, cx| {
2645 thread.insert_user_message("Message 2", loaded_context, None, Vec::new(), cx)
2646 });
2647
2648 // Third message with all three contexts (contexts 1 and 2 should be skipped)
2649 //
2650 add_file_to_context(&project, &context_store, "test/file3.rs", cx)
2651 .await
2652 .unwrap();
2653 let new_contexts = context_store.update(cx, |store, cx| {
2654 store.new_context_for_thread(thread.read(cx), None)
2655 });
2656 assert_eq!(new_contexts.len(), 1);
2657 let loaded_context = cx
2658 .update(|cx| load_context(new_contexts, &project, &None, cx))
2659 .await;
2660 let message3_id = thread.update(cx, |thread, cx| {
2661 thread.insert_user_message("Message 3", loaded_context, None, Vec::new(), cx)
2662 });
2663
2664 // Check what contexts are included in each message
2665 let (message1, message2, message3) = thread.read_with(cx, |thread, _| {
2666 (
2667 thread.message(message1_id).unwrap().clone(),
2668 thread.message(message2_id).unwrap().clone(),
2669 thread.message(message3_id).unwrap().clone(),
2670 )
2671 });
2672
2673 // First message should include context 1
2674 assert!(message1.loaded_context.text.contains("file1.rs"));
2675
2676 // Second message should include only context 2 (not 1)
2677 assert!(!message2.loaded_context.text.contains("file1.rs"));
2678 assert!(message2.loaded_context.text.contains("file2.rs"));
2679
2680 // Third message should include only context 3 (not 1 or 2)
2681 assert!(!message3.loaded_context.text.contains("file1.rs"));
2682 assert!(!message3.loaded_context.text.contains("file2.rs"));
2683 assert!(message3.loaded_context.text.contains("file3.rs"));
2684
2685 // Check entire request to make sure all contexts are properly included
2686 let request = thread.update(cx, |thread, cx| {
2687 thread.to_completion_request(model.clone(), cx)
2688 });
2689
2690 // The request should contain all 3 messages
2691 assert_eq!(request.messages.len(), 4);
2692
2693 // Check that the contexts are properly formatted in each message
2694 assert!(request.messages[1].string_contents().contains("file1.rs"));
2695 assert!(!request.messages[1].string_contents().contains("file2.rs"));
2696 assert!(!request.messages[1].string_contents().contains("file3.rs"));
2697
2698 assert!(!request.messages[2].string_contents().contains("file1.rs"));
2699 assert!(request.messages[2].string_contents().contains("file2.rs"));
2700 assert!(!request.messages[2].string_contents().contains("file3.rs"));
2701
2702 assert!(!request.messages[3].string_contents().contains("file1.rs"));
2703 assert!(!request.messages[3].string_contents().contains("file2.rs"));
2704 assert!(request.messages[3].string_contents().contains("file3.rs"));
2705
2706 add_file_to_context(&project, &context_store, "test/file4.rs", cx)
2707 .await
2708 .unwrap();
2709 let new_contexts = context_store.update(cx, |store, cx| {
2710 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2711 });
2712 assert_eq!(new_contexts.len(), 3);
2713 let loaded_context = cx
2714 .update(|cx| load_context(new_contexts, &project, &None, cx))
2715 .await
2716 .loaded_context;
2717
2718 assert!(!loaded_context.text.contains("file1.rs"));
2719 assert!(loaded_context.text.contains("file2.rs"));
2720 assert!(loaded_context.text.contains("file3.rs"));
2721 assert!(loaded_context.text.contains("file4.rs"));
2722
2723 let new_contexts = context_store.update(cx, |store, cx| {
2724 // Remove file4.rs
2725 store.remove_context(&loaded_context.contexts[2].handle(), cx);
2726 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2727 });
2728 assert_eq!(new_contexts.len(), 2);
2729 let loaded_context = cx
2730 .update(|cx| load_context(new_contexts, &project, &None, cx))
2731 .await
2732 .loaded_context;
2733
2734 assert!(!loaded_context.text.contains("file1.rs"));
2735 assert!(loaded_context.text.contains("file2.rs"));
2736 assert!(loaded_context.text.contains("file3.rs"));
2737 assert!(!loaded_context.text.contains("file4.rs"));
2738
2739 let new_contexts = context_store.update(cx, |store, cx| {
2740 // Remove file3.rs
2741 store.remove_context(&loaded_context.contexts[1].handle(), cx);
2742 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2743 });
2744 assert_eq!(new_contexts.len(), 1);
2745 let loaded_context = cx
2746 .update(|cx| load_context(new_contexts, &project, &None, cx))
2747 .await
2748 .loaded_context;
2749
2750 assert!(!loaded_context.text.contains("file1.rs"));
2751 assert!(loaded_context.text.contains("file2.rs"));
2752 assert!(!loaded_context.text.contains("file3.rs"));
2753 assert!(!loaded_context.text.contains("file4.rs"));
2754 }
2755
2756 #[gpui::test]
2757 async fn test_message_without_files(cx: &mut TestAppContext) {
2758 init_test_settings(cx);
2759
2760 let project = create_test_project(
2761 cx,
2762 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2763 )
2764 .await;
2765
2766 let (_, _thread_store, thread, _context_store, model) =
2767 setup_test_environment(cx, project.clone()).await;
2768
2769 // Insert user message without any context (empty context vector)
2770 let message_id = thread.update(cx, |thread, cx| {
2771 thread.insert_user_message(
2772 "What is the best way to learn Rust?",
2773 ContextLoadResult::default(),
2774 None,
2775 Vec::new(),
2776 cx,
2777 )
2778 });
2779
2780 // Check content and context in message object
2781 let message = thread.read_with(cx, |thread, _| thread.message(message_id).unwrap().clone());
2782
2783 // Context should be empty when no files are included
2784 assert_eq!(message.role, Role::User);
2785 assert_eq!(message.segments.len(), 1);
2786 assert_eq!(
2787 message.segments[0],
2788 MessageSegment::Text("What is the best way to learn Rust?".to_string())
2789 );
2790 assert_eq!(message.loaded_context.text, "");
2791
2792 // Check message in request
2793 let request = thread.update(cx, |thread, cx| {
2794 thread.to_completion_request(model.clone(), cx)
2795 });
2796
2797 assert_eq!(request.messages.len(), 2);
2798 assert_eq!(
2799 request.messages[1].string_contents(),
2800 "What is the best way to learn Rust?"
2801 );
2802
2803 // Add second message, also without context
2804 let message2_id = thread.update(cx, |thread, cx| {
2805 thread.insert_user_message(
2806 "Are there any good books?",
2807 ContextLoadResult::default(),
2808 None,
2809 Vec::new(),
2810 cx,
2811 )
2812 });
2813
2814 let message2 =
2815 thread.read_with(cx, |thread, _| thread.message(message2_id).unwrap().clone());
2816 assert_eq!(message2.loaded_context.text, "");
2817
2818 // Check that both messages appear in the request
2819 let request = thread.update(cx, |thread, cx| {
2820 thread.to_completion_request(model.clone(), cx)
2821 });
2822
2823 assert_eq!(request.messages.len(), 3);
2824 assert_eq!(
2825 request.messages[1].string_contents(),
2826 "What is the best way to learn Rust?"
2827 );
2828 assert_eq!(
2829 request.messages[2].string_contents(),
2830 "Are there any good books?"
2831 );
2832 }
2833
2834 #[gpui::test]
2835 async fn test_stale_buffer_notification(cx: &mut TestAppContext) {
2836 init_test_settings(cx);
2837
2838 let project = create_test_project(
2839 cx,
2840 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2841 )
2842 .await;
2843
2844 let (_workspace, _thread_store, thread, context_store, model) =
2845 setup_test_environment(cx, project.clone()).await;
2846
2847 // Open buffer and add it to context
2848 let buffer = add_file_to_context(&project, &context_store, "test/code.rs", cx)
2849 .await
2850 .unwrap();
2851
2852 let context = context_store.update(cx, |store, _| store.context().next().cloned().unwrap());
2853 let loaded_context = cx
2854 .update(|cx| load_context(vec![context], &project, &None, cx))
2855 .await;
2856
2857 // Insert user message with the buffer as context
2858 thread.update(cx, |thread, cx| {
2859 thread.insert_user_message("Explain this code", loaded_context, None, Vec::new(), cx)
2860 });
2861
2862 // Create a request and check that it doesn't have a stale buffer warning yet
2863 let initial_request = thread.update(cx, |thread, cx| {
2864 thread.to_completion_request(model.clone(), cx)
2865 });
2866
2867 // Make sure we don't have a stale file warning yet
2868 let has_stale_warning = initial_request.messages.iter().any(|msg| {
2869 msg.string_contents()
2870 .contains("These files changed since last read:")
2871 });
2872 assert!(
2873 !has_stale_warning,
2874 "Should not have stale buffer warning before buffer is modified"
2875 );
2876
2877 // Modify the buffer
2878 buffer.update(cx, |buffer, cx| {
2879 // Find a position at the end of line 1
2880 buffer.edit(
2881 [(1..1, "\n println!(\"Added a new line\");\n")],
2882 None,
2883 cx,
2884 );
2885 });
2886
2887 // Insert another user message without context
2888 thread.update(cx, |thread, cx| {
2889 thread.insert_user_message(
2890 "What does the code do now?",
2891 ContextLoadResult::default(),
2892 None,
2893 Vec::new(),
2894 cx,
2895 )
2896 });
2897
2898 // Create a new request and check for the stale buffer warning
2899 let new_request = thread.update(cx, |thread, cx| {
2900 thread.to_completion_request(model.clone(), cx)
2901 });
2902
2903 // We should have a stale file warning as the last message
2904 let last_message = new_request
2905 .messages
2906 .last()
2907 .expect("Request should have messages");
2908
2909 // The last message should be the stale buffer notification
2910 assert_eq!(last_message.role, Role::User);
2911
2912 // Check the exact content of the message
2913 let expected_content = "These files changed since last read:\n- code.rs\n";
2914 assert_eq!(
2915 last_message.string_contents(),
2916 expected_content,
2917 "Last message should be exactly the stale buffer notification"
2918 );
2919 }
2920
2921 fn init_test_settings(cx: &mut TestAppContext) {
2922 cx.update(|cx| {
2923 let settings_store = SettingsStore::test(cx);
2924 cx.set_global(settings_store);
2925 language::init(cx);
2926 Project::init_settings(cx);
2927 AssistantSettings::register(cx);
2928 prompt_store::init(cx);
2929 thread_store::init(cx);
2930 workspace::init_settings(cx);
2931 language_model::init_settings(cx);
2932 ThemeSettings::register(cx);
2933 ContextServerSettings::register(cx);
2934 EditorSettings::register(cx);
2935 ToolRegistry::default_global(cx);
2936 });
2937 }
2938
2939 // Helper to create a test project with test files
2940 async fn create_test_project(
2941 cx: &mut TestAppContext,
2942 files: serde_json::Value,
2943 ) -> Entity<Project> {
2944 let fs = FakeFs::new(cx.executor());
2945 fs.insert_tree(path!("/test"), files).await;
2946 Project::test(fs, [path!("/test").as_ref()], cx).await
2947 }
2948
2949 async fn setup_test_environment(
2950 cx: &mut TestAppContext,
2951 project: Entity<Project>,
2952 ) -> (
2953 Entity<Workspace>,
2954 Entity<ThreadStore>,
2955 Entity<Thread>,
2956 Entity<ContextStore>,
2957 Arc<dyn LanguageModel>,
2958 ) {
2959 let (workspace, cx) =
2960 cx.add_window_view(|window, cx| Workspace::test_new(project.clone(), window, cx));
2961
2962 let thread_store = cx
2963 .update(|_, cx| {
2964 ThreadStore::load(
2965 project.clone(),
2966 cx.new(|_| ToolWorkingSet::default()),
2967 None,
2968 Arc::new(PromptBuilder::new(None).unwrap()),
2969 cx,
2970 )
2971 })
2972 .await
2973 .unwrap();
2974
2975 let thread = thread_store.update(cx, |store, cx| store.create_thread(cx));
2976 let context_store = cx.new(|_cx| ContextStore::new(project.downgrade(), None));
2977
2978 let model = FakeLanguageModel::default();
2979 let model: Arc<dyn LanguageModel> = Arc::new(model);
2980
2981 (workspace, thread_store, thread, context_store, model)
2982 }
2983
2984 async fn add_file_to_context(
2985 project: &Entity<Project>,
2986 context_store: &Entity<ContextStore>,
2987 path: &str,
2988 cx: &mut TestAppContext,
2989 ) -> Result<Entity<language::Buffer>> {
2990 let buffer_path = project
2991 .read_with(cx, |project, cx| project.find_project_path(path, cx))
2992 .unwrap();
2993
2994 let buffer = project
2995 .update(cx, |project, cx| {
2996 project.open_buffer(buffer_path.clone(), cx)
2997 })
2998 .await
2999 .unwrap();
3000
3001 context_store.update(cx, |context_store, cx| {
3002 context_store.add_file_from_buffer(&buffer_path, buffer.clone(), false, cx);
3003 });
3004
3005 Ok(buffer)
3006 }
3007}