1use std::fmt::Write as _;
2use std::io::Write;
3use std::ops::Range;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Result, anyhow};
8use assistant_settings::AssistantSettings;
9use assistant_tool::{ActionLog, AnyToolCard, Tool, ToolWorkingSet};
10use chrono::{DateTime, Utc};
11use collections::HashMap;
12use editor::display_map::CreaseMetadata;
13use feature_flags::{self, FeatureFlagAppExt};
14use futures::future::Shared;
15use futures::{FutureExt, StreamExt as _};
16use git::repository::DiffType;
17use gpui::{
18 AnyWindowHandle, App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task,
19 WeakEntity,
20};
21use language_model::{
22 ConfiguredModel, LanguageModel, LanguageModelCompletionError, LanguageModelCompletionEvent,
23 LanguageModelId, LanguageModelKnownError, LanguageModelRegistry, LanguageModelRequest,
24 LanguageModelRequestMessage, LanguageModelRequestTool, LanguageModelToolResult,
25 LanguageModelToolUseId, MaxMonthlySpendReachedError, MessageContent,
26 ModelRequestLimitReachedError, PaymentRequiredError, RequestUsage, Role, SelectedModel,
27 StopReason, TokenUsage,
28};
29use postage::stream::Stream as _;
30use project::Project;
31use project::git_store::{GitStore, GitStoreCheckpoint, RepositoryState};
32use prompt_store::{ModelContext, PromptBuilder};
33use proto::Plan;
34use schemars::JsonSchema;
35use serde::{Deserialize, Serialize};
36use settings::Settings;
37use thiserror::Error;
38use util::{ResultExt as _, TryFutureExt as _, post_inc};
39use uuid::Uuid;
40use zed_llm_client::CompletionMode;
41
42use crate::ThreadStore;
43use crate::context::{AgentContext, AgentContextHandle, ContextLoadResult, LoadedContext};
44use crate::thread_store::{
45 SerializedCrease, SerializedLanguageModel, SerializedMessage, SerializedMessageSegment,
46 SerializedThread, SerializedToolResult, SerializedToolUse, SharedProjectContext,
47};
48use crate::tool_use::{PendingToolUse, ToolUse, ToolUseMetadata, ToolUseState};
49
50#[derive(
51 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize, JsonSchema,
52)]
53pub struct ThreadId(Arc<str>);
54
55impl ThreadId {
56 pub fn new() -> Self {
57 Self(Uuid::new_v4().to_string().into())
58 }
59}
60
61impl std::fmt::Display for ThreadId {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.0)
64 }
65}
66
67impl From<&str> for ThreadId {
68 fn from(value: &str) -> Self {
69 Self(value.into())
70 }
71}
72
73/// The ID of the user prompt that initiated a request.
74///
75/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
76#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
77pub struct PromptId(Arc<str>);
78
79impl PromptId {
80 pub fn new() -> Self {
81 Self(Uuid::new_v4().to_string().into())
82 }
83}
84
85impl std::fmt::Display for PromptId {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 write!(f, "{}", self.0)
88 }
89}
90
91#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, Serialize, Deserialize)]
92pub struct MessageId(pub(crate) usize);
93
94impl MessageId {
95 fn post_inc(&mut self) -> Self {
96 Self(post_inc(&mut self.0))
97 }
98}
99
100/// Stored information that can be used to resurrect a context crease when creating an editor for a past message.
101#[derive(Clone, Debug)]
102pub struct MessageCrease {
103 pub range: Range<usize>,
104 pub metadata: CreaseMetadata,
105 /// None for a deserialized message, Some otherwise.
106 pub context: Option<AgentContextHandle>,
107}
108
109/// A message in a [`Thread`].
110#[derive(Debug, Clone)]
111pub struct Message {
112 pub id: MessageId,
113 pub role: Role,
114 pub segments: Vec<MessageSegment>,
115 pub loaded_context: LoadedContext,
116 pub creases: Vec<MessageCrease>,
117}
118
119impl Message {
120 /// Returns whether the message contains any meaningful text that should be displayed
121 /// The model sometimes runs tool without producing any text or just a marker ([`USING_TOOL_MARKER`])
122 pub fn should_display_content(&self) -> bool {
123 self.segments.iter().all(|segment| segment.should_display())
124 }
125
126 pub fn push_thinking(&mut self, text: &str, signature: Option<String>) {
127 if let Some(MessageSegment::Thinking {
128 text: segment,
129 signature: current_signature,
130 }) = self.segments.last_mut()
131 {
132 if let Some(signature) = signature {
133 *current_signature = Some(signature);
134 }
135 segment.push_str(text);
136 } else {
137 self.segments.push(MessageSegment::Thinking {
138 text: text.to_string(),
139 signature,
140 });
141 }
142 }
143
144 pub fn push_text(&mut self, text: &str) {
145 if let Some(MessageSegment::Text(segment)) = self.segments.last_mut() {
146 segment.push_str(text);
147 } else {
148 self.segments.push(MessageSegment::Text(text.to_string()));
149 }
150 }
151
152 pub fn to_string(&self) -> String {
153 let mut result = String::new();
154
155 if !self.loaded_context.text.is_empty() {
156 result.push_str(&self.loaded_context.text);
157 }
158
159 for segment in &self.segments {
160 match segment {
161 MessageSegment::Text(text) => result.push_str(text),
162 MessageSegment::Thinking { text, .. } => {
163 result.push_str("<think>\n");
164 result.push_str(text);
165 result.push_str("\n</think>");
166 }
167 MessageSegment::RedactedThinking(_) => {}
168 }
169 }
170
171 result
172 }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub enum MessageSegment {
177 Text(String),
178 Thinking {
179 text: String,
180 signature: Option<String>,
181 },
182 RedactedThinking(Vec<u8>),
183}
184
185impl MessageSegment {
186 pub fn should_display(&self) -> bool {
187 match self {
188 Self::Text(text) => text.is_empty(),
189 Self::Thinking { text, .. } => text.is_empty(),
190 Self::RedactedThinking(_) => false,
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct ProjectSnapshot {
197 pub worktree_snapshots: Vec<WorktreeSnapshot>,
198 pub unsaved_buffer_paths: Vec<String>,
199 pub timestamp: DateTime<Utc>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct WorktreeSnapshot {
204 pub worktree_path: String,
205 pub git_state: Option<GitState>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct GitState {
210 pub remote_url: Option<String>,
211 pub head_sha: Option<String>,
212 pub current_branch: Option<String>,
213 pub diff: Option<String>,
214}
215
216#[derive(Clone)]
217pub struct ThreadCheckpoint {
218 message_id: MessageId,
219 git_checkpoint: GitStoreCheckpoint,
220}
221
222#[derive(Copy, Clone, Debug, PartialEq, Eq)]
223pub enum ThreadFeedback {
224 Positive,
225 Negative,
226}
227
228pub enum LastRestoreCheckpoint {
229 Pending {
230 message_id: MessageId,
231 },
232 Error {
233 message_id: MessageId,
234 error: String,
235 },
236}
237
238impl LastRestoreCheckpoint {
239 pub fn message_id(&self) -> MessageId {
240 match self {
241 LastRestoreCheckpoint::Pending { message_id } => *message_id,
242 LastRestoreCheckpoint::Error { message_id, .. } => *message_id,
243 }
244 }
245}
246
247#[derive(Clone, Debug, Default, Serialize, Deserialize)]
248pub enum DetailedSummaryState {
249 #[default]
250 NotGenerated,
251 Generating {
252 message_id: MessageId,
253 },
254 Generated {
255 text: SharedString,
256 message_id: MessageId,
257 },
258}
259
260impl DetailedSummaryState {
261 fn text(&self) -> Option<SharedString> {
262 if let Self::Generated { text, .. } = self {
263 Some(text.clone())
264 } else {
265 None
266 }
267 }
268}
269
270#[derive(Default)]
271pub struct TotalTokenUsage {
272 pub total: usize,
273 pub max: usize,
274}
275
276impl TotalTokenUsage {
277 pub fn ratio(&self) -> TokenUsageRatio {
278 #[cfg(debug_assertions)]
279 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
280 .unwrap_or("0.8".to_string())
281 .parse()
282 .unwrap();
283 #[cfg(not(debug_assertions))]
284 let warning_threshold: f32 = 0.8;
285
286 // When the maximum is unknown because there is no selected model,
287 // avoid showing the token limit warning.
288 if self.max == 0 {
289 TokenUsageRatio::Normal
290 } else if self.total >= self.max {
291 TokenUsageRatio::Exceeded
292 } else if self.total as f32 / self.max as f32 >= warning_threshold {
293 TokenUsageRatio::Warning
294 } else {
295 TokenUsageRatio::Normal
296 }
297 }
298
299 pub fn add(&self, tokens: usize) -> TotalTokenUsage {
300 TotalTokenUsage {
301 total: self.total + tokens,
302 max: self.max,
303 }
304 }
305}
306
307#[derive(Debug, Default, PartialEq, Eq)]
308pub enum TokenUsageRatio {
309 #[default]
310 Normal,
311 Warning,
312 Exceeded,
313}
314
315fn default_completion_mode(cx: &App) -> CompletionMode {
316 if cx.is_staff() {
317 CompletionMode::Max
318 } else {
319 CompletionMode::Normal
320 }
321}
322
323/// A thread of conversation with the LLM.
324pub struct Thread {
325 id: ThreadId,
326 updated_at: DateTime<Utc>,
327 summary: Option<SharedString>,
328 pending_summary: Task<Option<()>>,
329 detailed_summary_task: Task<Option<()>>,
330 detailed_summary_tx: postage::watch::Sender<DetailedSummaryState>,
331 detailed_summary_rx: postage::watch::Receiver<DetailedSummaryState>,
332 completion_mode: CompletionMode,
333 messages: Vec<Message>,
334 next_message_id: MessageId,
335 last_prompt_id: PromptId,
336 project_context: SharedProjectContext,
337 checkpoints_by_message: HashMap<MessageId, ThreadCheckpoint>,
338 completion_count: usize,
339 pending_completions: Vec<PendingCompletion>,
340 project: Entity<Project>,
341 prompt_builder: Arc<PromptBuilder>,
342 tools: Entity<ToolWorkingSet>,
343 tool_use: ToolUseState,
344 action_log: Entity<ActionLog>,
345 last_restore_checkpoint: Option<LastRestoreCheckpoint>,
346 pending_checkpoint: Option<ThreadCheckpoint>,
347 initial_project_snapshot: Shared<Task<Option<Arc<ProjectSnapshot>>>>,
348 request_token_usage: Vec<TokenUsage>,
349 cumulative_token_usage: TokenUsage,
350 exceeded_window_error: Option<ExceededWindowError>,
351 feedback: Option<ThreadFeedback>,
352 message_feedback: HashMap<MessageId, ThreadFeedback>,
353 last_auto_capture_at: Option<Instant>,
354 request_callback: Option<
355 Box<dyn FnMut(&LanguageModelRequest, &[Result<LanguageModelCompletionEvent, String>])>,
356 >,
357 remaining_turns: u32,
358 configured_model: Option<ConfiguredModel>,
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize)]
362pub struct ExceededWindowError {
363 /// Model used when last message exceeded context window
364 model_id: LanguageModelId,
365 /// Token count including last message
366 token_count: usize,
367}
368
369impl Thread {
370 pub fn new(
371 project: Entity<Project>,
372 tools: Entity<ToolWorkingSet>,
373 prompt_builder: Arc<PromptBuilder>,
374 system_prompt: SharedProjectContext,
375 cx: &mut Context<Self>,
376 ) -> Self {
377 let (detailed_summary_tx, detailed_summary_rx) = postage::watch::channel();
378 let configured_model = LanguageModelRegistry::read_global(cx).default_model();
379
380 Self {
381 id: ThreadId::new(),
382 updated_at: Utc::now(),
383 summary: None,
384 pending_summary: Task::ready(None),
385 detailed_summary_task: Task::ready(None),
386 detailed_summary_tx,
387 detailed_summary_rx,
388 completion_mode: default_completion_mode(cx),
389 messages: Vec::new(),
390 next_message_id: MessageId(0),
391 last_prompt_id: PromptId::new(),
392 project_context: system_prompt,
393 checkpoints_by_message: HashMap::default(),
394 completion_count: 0,
395 pending_completions: Vec::new(),
396 project: project.clone(),
397 prompt_builder,
398 tools: tools.clone(),
399 last_restore_checkpoint: None,
400 pending_checkpoint: None,
401 tool_use: ToolUseState::new(tools.clone()),
402 action_log: cx.new(|_| ActionLog::new(project.clone())),
403 initial_project_snapshot: {
404 let project_snapshot = Self::project_snapshot(project, cx);
405 cx.foreground_executor()
406 .spawn(async move { Some(project_snapshot.await) })
407 .shared()
408 },
409 request_token_usage: Vec::new(),
410 cumulative_token_usage: TokenUsage::default(),
411 exceeded_window_error: None,
412 feedback: None,
413 message_feedback: HashMap::default(),
414 last_auto_capture_at: None,
415 request_callback: None,
416 remaining_turns: u32::MAX,
417 configured_model,
418 }
419 }
420
421 pub fn deserialize(
422 id: ThreadId,
423 serialized: SerializedThread,
424 project: Entity<Project>,
425 tools: Entity<ToolWorkingSet>,
426 prompt_builder: Arc<PromptBuilder>,
427 project_context: SharedProjectContext,
428 cx: &mut Context<Self>,
429 ) -> Self {
430 let next_message_id = MessageId(
431 serialized
432 .messages
433 .last()
434 .map(|message| message.id.0 + 1)
435 .unwrap_or(0),
436 );
437 let tool_use = ToolUseState::from_serialized_messages(tools.clone(), &serialized.messages);
438 let (detailed_summary_tx, detailed_summary_rx) =
439 postage::watch::channel_with(serialized.detailed_summary_state);
440
441 let configured_model = LanguageModelRegistry::global(cx).update(cx, |registry, cx| {
442 serialized
443 .model
444 .and_then(|model| {
445 let model = SelectedModel {
446 provider: model.provider.clone().into(),
447 model: model.model.clone().into(),
448 };
449 registry.select_model(&model, cx)
450 })
451 .or_else(|| registry.default_model())
452 });
453
454 Self {
455 id,
456 updated_at: serialized.updated_at,
457 summary: Some(serialized.summary),
458 pending_summary: Task::ready(None),
459 detailed_summary_task: Task::ready(None),
460 detailed_summary_tx,
461 detailed_summary_rx,
462 completion_mode: default_completion_mode(cx),
463 messages: serialized
464 .messages
465 .into_iter()
466 .map(|message| Message {
467 id: message.id,
468 role: message.role,
469 segments: message
470 .segments
471 .into_iter()
472 .map(|segment| match segment {
473 SerializedMessageSegment::Text { text } => MessageSegment::Text(text),
474 SerializedMessageSegment::Thinking { text, signature } => {
475 MessageSegment::Thinking { text, signature }
476 }
477 SerializedMessageSegment::RedactedThinking { data } => {
478 MessageSegment::RedactedThinking(data)
479 }
480 })
481 .collect(),
482 loaded_context: LoadedContext {
483 contexts: Vec::new(),
484 text: message.context,
485 images: Vec::new(),
486 },
487 creases: message
488 .creases
489 .into_iter()
490 .map(|crease| MessageCrease {
491 range: crease.start..crease.end,
492 metadata: CreaseMetadata {
493 icon_path: crease.icon_path,
494 label: crease.label,
495 },
496 context: None,
497 })
498 .collect(),
499 })
500 .collect(),
501 next_message_id,
502 last_prompt_id: PromptId::new(),
503 project_context,
504 checkpoints_by_message: HashMap::default(),
505 completion_count: 0,
506 pending_completions: Vec::new(),
507 last_restore_checkpoint: None,
508 pending_checkpoint: None,
509 project: project.clone(),
510 prompt_builder,
511 tools,
512 tool_use,
513 action_log: cx.new(|_| ActionLog::new(project)),
514 initial_project_snapshot: Task::ready(serialized.initial_project_snapshot).shared(),
515 request_token_usage: serialized.request_token_usage,
516 cumulative_token_usage: serialized.cumulative_token_usage,
517 exceeded_window_error: None,
518 feedback: None,
519 message_feedback: HashMap::default(),
520 last_auto_capture_at: None,
521 request_callback: None,
522 remaining_turns: u32::MAX,
523 configured_model,
524 }
525 }
526
527 pub fn set_request_callback(
528 &mut self,
529 callback: impl 'static
530 + FnMut(&LanguageModelRequest, &[Result<LanguageModelCompletionEvent, String>]),
531 ) {
532 self.request_callback = Some(Box::new(callback));
533 }
534
535 pub fn id(&self) -> &ThreadId {
536 &self.id
537 }
538
539 pub fn is_empty(&self) -> bool {
540 self.messages.is_empty()
541 }
542
543 pub fn updated_at(&self) -> DateTime<Utc> {
544 self.updated_at
545 }
546
547 pub fn touch_updated_at(&mut self) {
548 self.updated_at = Utc::now();
549 }
550
551 pub fn advance_prompt_id(&mut self) {
552 self.last_prompt_id = PromptId::new();
553 }
554
555 pub fn summary(&self) -> Option<SharedString> {
556 self.summary.clone()
557 }
558
559 pub fn project_context(&self) -> SharedProjectContext {
560 self.project_context.clone()
561 }
562
563 pub fn get_or_init_configured_model(&mut self, cx: &App) -> Option<ConfiguredModel> {
564 if self.configured_model.is_none() {
565 self.configured_model = LanguageModelRegistry::read_global(cx).default_model();
566 }
567 self.configured_model.clone()
568 }
569
570 pub fn configured_model(&self) -> Option<ConfiguredModel> {
571 self.configured_model.clone()
572 }
573
574 pub fn set_configured_model(&mut self, model: Option<ConfiguredModel>, cx: &mut Context<Self>) {
575 self.configured_model = model;
576 cx.notify();
577 }
578
579 pub const DEFAULT_SUMMARY: SharedString = SharedString::new_static("New Thread");
580
581 pub fn summary_or_default(&self) -> SharedString {
582 self.summary.clone().unwrap_or(Self::DEFAULT_SUMMARY)
583 }
584
585 pub fn set_summary(&mut self, new_summary: impl Into<SharedString>, cx: &mut Context<Self>) {
586 let Some(current_summary) = &self.summary else {
587 // Don't allow setting summary until generated
588 return;
589 };
590
591 let mut new_summary = new_summary.into();
592
593 if new_summary.is_empty() {
594 new_summary = Self::DEFAULT_SUMMARY;
595 }
596
597 if current_summary != &new_summary {
598 self.summary = Some(new_summary);
599 cx.emit(ThreadEvent::SummaryChanged);
600 }
601 }
602
603 pub fn completion_mode(&self) -> CompletionMode {
604 self.completion_mode
605 }
606
607 pub fn set_completion_mode(&mut self, mode: CompletionMode) {
608 self.completion_mode = mode;
609 }
610
611 pub fn message(&self, id: MessageId) -> Option<&Message> {
612 let index = self
613 .messages
614 .binary_search_by(|message| message.id.cmp(&id))
615 .ok()?;
616
617 self.messages.get(index)
618 }
619
620 pub fn messages(&self) -> impl ExactSizeIterator<Item = &Message> {
621 self.messages.iter()
622 }
623
624 pub fn is_generating(&self) -> bool {
625 !self.pending_completions.is_empty() || !self.all_tools_finished()
626 }
627
628 pub fn tools(&self) -> &Entity<ToolWorkingSet> {
629 &self.tools
630 }
631
632 pub fn pending_tool(&self, id: &LanguageModelToolUseId) -> Option<&PendingToolUse> {
633 self.tool_use
634 .pending_tool_uses()
635 .into_iter()
636 .find(|tool_use| &tool_use.id == id)
637 }
638
639 pub fn tools_needing_confirmation(&self) -> impl Iterator<Item = &PendingToolUse> {
640 self.tool_use
641 .pending_tool_uses()
642 .into_iter()
643 .filter(|tool_use| tool_use.status.needs_confirmation())
644 }
645
646 pub fn has_pending_tool_uses(&self) -> bool {
647 !self.tool_use.pending_tool_uses().is_empty()
648 }
649
650 pub fn checkpoint_for_message(&self, id: MessageId) -> Option<ThreadCheckpoint> {
651 self.checkpoints_by_message.get(&id).cloned()
652 }
653
654 pub fn restore_checkpoint(
655 &mut self,
656 checkpoint: ThreadCheckpoint,
657 cx: &mut Context<Self>,
658 ) -> Task<Result<()>> {
659 self.last_restore_checkpoint = Some(LastRestoreCheckpoint::Pending {
660 message_id: checkpoint.message_id,
661 });
662 cx.emit(ThreadEvent::CheckpointChanged);
663 cx.notify();
664
665 let git_store = self.project().read(cx).git_store().clone();
666 let restore = git_store.update(cx, |git_store, cx| {
667 git_store.restore_checkpoint(checkpoint.git_checkpoint.clone(), cx)
668 });
669
670 cx.spawn(async move |this, cx| {
671 let result = restore.await;
672 this.update(cx, |this, cx| {
673 if let Err(err) = result.as_ref() {
674 this.last_restore_checkpoint = Some(LastRestoreCheckpoint::Error {
675 message_id: checkpoint.message_id,
676 error: err.to_string(),
677 });
678 } else {
679 this.truncate(checkpoint.message_id, cx);
680 this.last_restore_checkpoint = None;
681 }
682 this.pending_checkpoint = None;
683 cx.emit(ThreadEvent::CheckpointChanged);
684 cx.notify();
685 })?;
686 result
687 })
688 }
689
690 fn finalize_pending_checkpoint(&mut self, cx: &mut Context<Self>) {
691 let pending_checkpoint = if self.is_generating() {
692 return;
693 } else if let Some(checkpoint) = self.pending_checkpoint.take() {
694 checkpoint
695 } else {
696 return;
697 };
698
699 let git_store = self.project.read(cx).git_store().clone();
700 let final_checkpoint = git_store.update(cx, |git_store, cx| git_store.checkpoint(cx));
701 cx.spawn(async move |this, cx| match final_checkpoint.await {
702 Ok(final_checkpoint) => {
703 let equal = git_store
704 .update(cx, |store, cx| {
705 store.compare_checkpoints(
706 pending_checkpoint.git_checkpoint.clone(),
707 final_checkpoint.clone(),
708 cx,
709 )
710 })?
711 .await
712 .unwrap_or(false);
713
714 if !equal {
715 this.update(cx, |this, cx| {
716 this.insert_checkpoint(pending_checkpoint, cx)
717 })?;
718 }
719
720 Ok(())
721 }
722 Err(_) => this.update(cx, |this, cx| {
723 this.insert_checkpoint(pending_checkpoint, cx)
724 }),
725 })
726 .detach();
727 }
728
729 fn insert_checkpoint(&mut self, checkpoint: ThreadCheckpoint, cx: &mut Context<Self>) {
730 self.checkpoints_by_message
731 .insert(checkpoint.message_id, checkpoint);
732 cx.emit(ThreadEvent::CheckpointChanged);
733 cx.notify();
734 }
735
736 pub fn last_restore_checkpoint(&self) -> Option<&LastRestoreCheckpoint> {
737 self.last_restore_checkpoint.as_ref()
738 }
739
740 pub fn truncate(&mut self, message_id: MessageId, cx: &mut Context<Self>) {
741 let Some(message_ix) = self
742 .messages
743 .iter()
744 .rposition(|message| message.id == message_id)
745 else {
746 return;
747 };
748 for deleted_message in self.messages.drain(message_ix..) {
749 self.checkpoints_by_message.remove(&deleted_message.id);
750 }
751 cx.notify();
752 }
753
754 pub fn context_for_message(&self, id: MessageId) -> impl Iterator<Item = &AgentContext> {
755 self.messages
756 .iter()
757 .find(|message| message.id == id)
758 .into_iter()
759 .flat_map(|message| message.loaded_context.contexts.iter())
760 }
761
762 pub fn is_turn_end(&self, ix: usize) -> bool {
763 if self.messages.is_empty() {
764 return false;
765 }
766
767 if !self.is_generating() && ix == self.messages.len() - 1 {
768 return true;
769 }
770
771 let Some(message) = self.messages.get(ix) else {
772 return false;
773 };
774
775 if message.role != Role::Assistant {
776 return false;
777 }
778
779 self.messages
780 .get(ix + 1)
781 .and_then(|message| {
782 self.message(message.id)
783 .map(|next_message| next_message.role == Role::User)
784 })
785 .unwrap_or(false)
786 }
787
788 /// Returns whether all of the tool uses have finished running.
789 pub fn all_tools_finished(&self) -> bool {
790 // If the only pending tool uses left are the ones with errors, then
791 // that means that we've finished running all of the pending tools.
792 self.tool_use
793 .pending_tool_uses()
794 .iter()
795 .all(|tool_use| tool_use.status.is_error())
796 }
797
798 pub fn tool_uses_for_message(&self, id: MessageId, cx: &App) -> Vec<ToolUse> {
799 self.tool_use.tool_uses_for_message(id, cx)
800 }
801
802 pub fn tool_results_for_message(
803 &self,
804 assistant_message_id: MessageId,
805 ) -> Vec<&LanguageModelToolResult> {
806 self.tool_use.tool_results_for_message(assistant_message_id)
807 }
808
809 pub fn tool_result(&self, id: &LanguageModelToolUseId) -> Option<&LanguageModelToolResult> {
810 self.tool_use.tool_result(id)
811 }
812
813 pub fn output_for_tool(&self, id: &LanguageModelToolUseId) -> Option<&Arc<str>> {
814 Some(&self.tool_use.tool_result(id)?.content)
815 }
816
817 pub fn card_for_tool(&self, id: &LanguageModelToolUseId) -> Option<AnyToolCard> {
818 self.tool_use.tool_result_card(id).cloned()
819 }
820
821 /// Return tools that are both enabled and supported by the model
822 pub fn available_tools(
823 &self,
824 cx: &App,
825 model: Arc<dyn LanguageModel>,
826 ) -> Vec<LanguageModelRequestTool> {
827 if model.supports_tools() {
828 self.tools()
829 .read(cx)
830 .enabled_tools(cx)
831 .into_iter()
832 .filter_map(|tool| {
833 // Skip tools that cannot be supported
834 let input_schema = tool.input_schema(model.tool_input_format()).ok()?;
835 Some(LanguageModelRequestTool {
836 name: tool.name(),
837 description: tool.description(),
838 input_schema,
839 })
840 })
841 .collect()
842 } else {
843 Vec::default()
844 }
845 }
846
847 pub fn insert_user_message(
848 &mut self,
849 text: impl Into<String>,
850 loaded_context: ContextLoadResult,
851 git_checkpoint: Option<GitStoreCheckpoint>,
852 creases: Vec<MessageCrease>,
853 cx: &mut Context<Self>,
854 ) -> MessageId {
855 if !loaded_context.referenced_buffers.is_empty() {
856 self.action_log.update(cx, |log, cx| {
857 for buffer in loaded_context.referenced_buffers {
858 log.track_buffer(buffer, cx);
859 }
860 });
861 }
862
863 let message_id = self.insert_message(
864 Role::User,
865 vec![MessageSegment::Text(text.into())],
866 loaded_context.loaded_context,
867 creases,
868 cx,
869 );
870
871 if let Some(git_checkpoint) = git_checkpoint {
872 self.pending_checkpoint = Some(ThreadCheckpoint {
873 message_id,
874 git_checkpoint,
875 });
876 }
877
878 self.auto_capture_telemetry(cx);
879
880 message_id
881 }
882
883 pub fn insert_assistant_message(
884 &mut self,
885 segments: Vec<MessageSegment>,
886 cx: &mut Context<Self>,
887 ) -> MessageId {
888 self.insert_message(
889 Role::Assistant,
890 segments,
891 LoadedContext::default(),
892 Vec::new(),
893 cx,
894 )
895 }
896
897 pub fn insert_message(
898 &mut self,
899 role: Role,
900 segments: Vec<MessageSegment>,
901 loaded_context: LoadedContext,
902 creases: Vec<MessageCrease>,
903 cx: &mut Context<Self>,
904 ) -> MessageId {
905 let id = self.next_message_id.post_inc();
906 self.messages.push(Message {
907 id,
908 role,
909 segments,
910 loaded_context,
911 creases,
912 });
913 self.touch_updated_at();
914 cx.emit(ThreadEvent::MessageAdded(id));
915 id
916 }
917
918 pub fn edit_message(
919 &mut self,
920 id: MessageId,
921 new_role: Role,
922 new_segments: Vec<MessageSegment>,
923 loaded_context: Option<LoadedContext>,
924 cx: &mut Context<Self>,
925 ) -> bool {
926 let Some(message) = self.messages.iter_mut().find(|message| message.id == id) else {
927 return false;
928 };
929 message.role = new_role;
930 message.segments = new_segments;
931 if let Some(context) = loaded_context {
932 message.loaded_context = context;
933 }
934 self.touch_updated_at();
935 cx.emit(ThreadEvent::MessageEdited(id));
936 true
937 }
938
939 pub fn delete_message(&mut self, id: MessageId, cx: &mut Context<Self>) -> bool {
940 let Some(index) = self.messages.iter().position(|message| message.id == id) else {
941 return false;
942 };
943 self.messages.remove(index);
944 self.touch_updated_at();
945 cx.emit(ThreadEvent::MessageDeleted(id));
946 true
947 }
948
949 /// Returns the representation of this [`Thread`] in a textual form.
950 ///
951 /// This is the representation we use when attaching a thread as context to another thread.
952 pub fn text(&self) -> String {
953 let mut text = String::new();
954
955 for message in &self.messages {
956 text.push_str(match message.role {
957 language_model::Role::User => "User:",
958 language_model::Role::Assistant => "Assistant:",
959 language_model::Role::System => "System:",
960 });
961 text.push('\n');
962
963 for segment in &message.segments {
964 match segment {
965 MessageSegment::Text(content) => text.push_str(content),
966 MessageSegment::Thinking { text: content, .. } => {
967 text.push_str(&format!("<think>{}</think>", content))
968 }
969 MessageSegment::RedactedThinking(_) => {}
970 }
971 }
972 text.push('\n');
973 }
974
975 text
976 }
977
978 /// Serializes this thread into a format for storage or telemetry.
979 pub fn serialize(&self, cx: &mut Context<Self>) -> Task<Result<SerializedThread>> {
980 let initial_project_snapshot = self.initial_project_snapshot.clone();
981 cx.spawn(async move |this, cx| {
982 let initial_project_snapshot = initial_project_snapshot.await;
983 this.read_with(cx, |this, cx| SerializedThread {
984 version: SerializedThread::VERSION.to_string(),
985 summary: this.summary_or_default(),
986 updated_at: this.updated_at(),
987 messages: this
988 .messages()
989 .map(|message| SerializedMessage {
990 id: message.id,
991 role: message.role,
992 segments: message
993 .segments
994 .iter()
995 .map(|segment| match segment {
996 MessageSegment::Text(text) => {
997 SerializedMessageSegment::Text { text: text.clone() }
998 }
999 MessageSegment::Thinking { text, signature } => {
1000 SerializedMessageSegment::Thinking {
1001 text: text.clone(),
1002 signature: signature.clone(),
1003 }
1004 }
1005 MessageSegment::RedactedThinking(data) => {
1006 SerializedMessageSegment::RedactedThinking {
1007 data: data.clone(),
1008 }
1009 }
1010 })
1011 .collect(),
1012 tool_uses: this
1013 .tool_uses_for_message(message.id, cx)
1014 .into_iter()
1015 .map(|tool_use| SerializedToolUse {
1016 id: tool_use.id,
1017 name: tool_use.name,
1018 input: tool_use.input,
1019 })
1020 .collect(),
1021 tool_results: this
1022 .tool_results_for_message(message.id)
1023 .into_iter()
1024 .map(|tool_result| SerializedToolResult {
1025 tool_use_id: tool_result.tool_use_id.clone(),
1026 is_error: tool_result.is_error,
1027 content: tool_result.content.clone(),
1028 })
1029 .collect(),
1030 context: message.loaded_context.text.clone(),
1031 creases: message
1032 .creases
1033 .iter()
1034 .map(|crease| SerializedCrease {
1035 start: crease.range.start,
1036 end: crease.range.end,
1037 icon_path: crease.metadata.icon_path.clone(),
1038 label: crease.metadata.label.clone(),
1039 })
1040 .collect(),
1041 })
1042 .collect(),
1043 initial_project_snapshot,
1044 cumulative_token_usage: this.cumulative_token_usage,
1045 request_token_usage: this.request_token_usage.clone(),
1046 detailed_summary_state: this.detailed_summary_rx.borrow().clone(),
1047 exceeded_window_error: this.exceeded_window_error.clone(),
1048 model: this
1049 .configured_model
1050 .as_ref()
1051 .map(|model| SerializedLanguageModel {
1052 provider: model.provider.id().0.to_string(),
1053 model: model.model.id().0.to_string(),
1054 }),
1055 })
1056 })
1057 }
1058
1059 pub fn remaining_turns(&self) -> u32 {
1060 self.remaining_turns
1061 }
1062
1063 pub fn set_remaining_turns(&mut self, remaining_turns: u32) {
1064 self.remaining_turns = remaining_turns;
1065 }
1066
1067 pub fn send_to_model(
1068 &mut self,
1069 model: Arc<dyn LanguageModel>,
1070 window: Option<AnyWindowHandle>,
1071 cx: &mut Context<Self>,
1072 ) {
1073 if self.remaining_turns == 0 {
1074 return;
1075 }
1076
1077 self.remaining_turns -= 1;
1078
1079 let request = self.to_completion_request(model.clone(), cx);
1080
1081 self.stream_completion(request, model, window, cx);
1082 }
1083
1084 pub fn used_tools_since_last_user_message(&self) -> bool {
1085 for message in self.messages.iter().rev() {
1086 if self.tool_use.message_has_tool_results(message.id) {
1087 return true;
1088 } else if message.role == Role::User {
1089 return false;
1090 }
1091 }
1092
1093 false
1094 }
1095
1096 pub fn to_completion_request(
1097 &self,
1098 model: Arc<dyn LanguageModel>,
1099 cx: &mut Context<Self>,
1100 ) -> LanguageModelRequest {
1101 let mut request = LanguageModelRequest {
1102 thread_id: Some(self.id.to_string()),
1103 prompt_id: Some(self.last_prompt_id.to_string()),
1104 mode: None,
1105 messages: vec![],
1106 tools: Vec::new(),
1107 stop: Vec::new(),
1108 temperature: None,
1109 };
1110
1111 let available_tools = self.available_tools(cx, model.clone());
1112 let available_tool_names = available_tools
1113 .iter()
1114 .map(|tool| tool.name.clone())
1115 .collect();
1116
1117 let model_context = &ModelContext {
1118 available_tools: available_tool_names,
1119 };
1120
1121 if let Some(project_context) = self.project_context.borrow().as_ref() {
1122 match self
1123 .prompt_builder
1124 .generate_assistant_system_prompt(project_context, model_context)
1125 {
1126 Err(err) => {
1127 let message = format!("{err:?}").into();
1128 log::error!("{message}");
1129 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1130 header: "Error generating system prompt".into(),
1131 message,
1132 }));
1133 }
1134 Ok(system_prompt) => {
1135 request.messages.push(LanguageModelRequestMessage {
1136 role: Role::System,
1137 content: vec![MessageContent::Text(system_prompt)],
1138 cache: true,
1139 });
1140 }
1141 }
1142 } else {
1143 let message = "Context for system prompt unexpectedly not ready.".into();
1144 log::error!("{message}");
1145 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1146 header: "Error generating system prompt".into(),
1147 message,
1148 }));
1149 }
1150
1151 for message in &self.messages {
1152 let mut request_message = LanguageModelRequestMessage {
1153 role: message.role,
1154 content: Vec::new(),
1155 cache: false,
1156 };
1157
1158 message
1159 .loaded_context
1160 .add_to_request_message(&mut request_message);
1161
1162 for segment in &message.segments {
1163 match segment {
1164 MessageSegment::Text(text) => {
1165 if !text.is_empty() {
1166 request_message
1167 .content
1168 .push(MessageContent::Text(text.into()));
1169 }
1170 }
1171 MessageSegment::Thinking { text, signature } => {
1172 if !text.is_empty() {
1173 request_message.content.push(MessageContent::Thinking {
1174 text: text.into(),
1175 signature: signature.clone(),
1176 });
1177 }
1178 }
1179 MessageSegment::RedactedThinking(data) => {
1180 request_message
1181 .content
1182 .push(MessageContent::RedactedThinking(data.clone()));
1183 }
1184 };
1185 }
1186
1187 self.tool_use
1188 .attach_tool_uses(message.id, &mut request_message);
1189
1190 request.messages.push(request_message);
1191
1192 if let Some(tool_results_message) = self.tool_use.tool_results_message(message.id) {
1193 request.messages.push(tool_results_message);
1194 }
1195 }
1196
1197 // https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching
1198 if let Some(last) = request.messages.last_mut() {
1199 last.cache = true;
1200 }
1201
1202 self.attached_tracked_files_state(&mut request.messages, cx);
1203
1204 request.tools = available_tools;
1205 request.mode = if model.supports_max_mode() {
1206 Some(self.completion_mode)
1207 } else {
1208 Some(CompletionMode::Normal)
1209 };
1210
1211 request
1212 }
1213
1214 fn to_summarize_request(&self, added_user_message: String) -> LanguageModelRequest {
1215 let mut request = LanguageModelRequest {
1216 thread_id: None,
1217 prompt_id: None,
1218 mode: None,
1219 messages: vec![],
1220 tools: Vec::new(),
1221 stop: Vec::new(),
1222 temperature: None,
1223 };
1224
1225 for message in &self.messages {
1226 let mut request_message = LanguageModelRequestMessage {
1227 role: message.role,
1228 content: Vec::new(),
1229 cache: false,
1230 };
1231
1232 for segment in &message.segments {
1233 match segment {
1234 MessageSegment::Text(text) => request_message
1235 .content
1236 .push(MessageContent::Text(text.clone())),
1237 MessageSegment::Thinking { .. } => {}
1238 MessageSegment::RedactedThinking(_) => {}
1239 }
1240 }
1241
1242 if request_message.content.is_empty() {
1243 continue;
1244 }
1245
1246 request.messages.push(request_message);
1247 }
1248
1249 request.messages.push(LanguageModelRequestMessage {
1250 role: Role::User,
1251 content: vec![MessageContent::Text(added_user_message)],
1252 cache: false,
1253 });
1254
1255 request
1256 }
1257
1258 fn attached_tracked_files_state(
1259 &self,
1260 messages: &mut Vec<LanguageModelRequestMessage>,
1261 cx: &App,
1262 ) {
1263 const STALE_FILES_HEADER: &str = "These files changed since last read:";
1264
1265 let mut stale_message = String::new();
1266
1267 let action_log = self.action_log.read(cx);
1268
1269 for stale_file in action_log.stale_buffers(cx) {
1270 let Some(file) = stale_file.read(cx).file() else {
1271 continue;
1272 };
1273
1274 if stale_message.is_empty() {
1275 write!(&mut stale_message, "{}\n", STALE_FILES_HEADER).ok();
1276 }
1277
1278 writeln!(&mut stale_message, "- {}", file.path().display()).ok();
1279 }
1280
1281 let mut content = Vec::with_capacity(2);
1282
1283 if !stale_message.is_empty() {
1284 content.push(stale_message.into());
1285 }
1286
1287 if !content.is_empty() {
1288 let context_message = LanguageModelRequestMessage {
1289 role: Role::User,
1290 content,
1291 cache: false,
1292 };
1293
1294 messages.push(context_message);
1295 }
1296 }
1297
1298 pub fn stream_completion(
1299 &mut self,
1300 request: LanguageModelRequest,
1301 model: Arc<dyn LanguageModel>,
1302 window: Option<AnyWindowHandle>,
1303 cx: &mut Context<Self>,
1304 ) {
1305 let pending_completion_id = post_inc(&mut self.completion_count);
1306 let mut request_callback_parameters = if self.request_callback.is_some() {
1307 Some((request.clone(), Vec::new()))
1308 } else {
1309 None
1310 };
1311 let prompt_id = self.last_prompt_id.clone();
1312 let tool_use_metadata = ToolUseMetadata {
1313 model: model.clone(),
1314 thread_id: self.id.clone(),
1315 prompt_id: prompt_id.clone(),
1316 };
1317
1318 let task = cx.spawn(async move |thread, cx| {
1319 let stream_completion_future = model.stream_completion_with_usage(request, &cx);
1320 let initial_token_usage =
1321 thread.read_with(cx, |thread, _cx| thread.cumulative_token_usage);
1322 let stream_completion = async {
1323 let (mut events, usage) = stream_completion_future.await?;
1324
1325 let mut stop_reason = StopReason::EndTurn;
1326 let mut current_token_usage = TokenUsage::default();
1327
1328 thread
1329 .update(cx, |_thread, cx| {
1330 if let Some(usage) = usage {
1331 cx.emit(ThreadEvent::UsageUpdated(usage));
1332 }
1333 cx.emit(ThreadEvent::NewRequest);
1334 })
1335 .ok();
1336
1337 let mut request_assistant_message_id = None;
1338
1339 while let Some(event) = events.next().await {
1340 if let Some((_, response_events)) = request_callback_parameters.as_mut() {
1341 response_events
1342 .push(event.as_ref().map_err(|error| error.to_string()).cloned());
1343 }
1344
1345 thread.update(cx, |thread, cx| {
1346 let event = match event {
1347 Ok(event) => event,
1348 Err(LanguageModelCompletionError::BadInputJson {
1349 id,
1350 tool_name,
1351 raw_input: invalid_input_json,
1352 json_parse_error,
1353 }) => {
1354 thread.receive_invalid_tool_json(
1355 id,
1356 tool_name,
1357 invalid_input_json,
1358 json_parse_error,
1359 window,
1360 cx,
1361 );
1362 return Ok(());
1363 }
1364 Err(LanguageModelCompletionError::Other(error)) => {
1365 return Err(error);
1366 }
1367 };
1368
1369 match event {
1370 LanguageModelCompletionEvent::StartMessage { .. } => {
1371 request_assistant_message_id =
1372 Some(thread.insert_assistant_message(
1373 vec![MessageSegment::Text(String::new())],
1374 cx,
1375 ));
1376 }
1377 LanguageModelCompletionEvent::Stop(reason) => {
1378 stop_reason = reason;
1379 }
1380 LanguageModelCompletionEvent::UsageUpdate(token_usage) => {
1381 thread.update_token_usage_at_last_message(token_usage);
1382 thread.cumulative_token_usage = thread.cumulative_token_usage
1383 + token_usage
1384 - current_token_usage;
1385 current_token_usage = token_usage;
1386 }
1387 LanguageModelCompletionEvent::Text(chunk) => {
1388 cx.emit(ThreadEvent::ReceivedTextChunk);
1389 if let Some(last_message) = thread.messages.last_mut() {
1390 if last_message.role == Role::Assistant
1391 && !thread.tool_use.has_tool_results(last_message.id)
1392 {
1393 last_message.push_text(&chunk);
1394 cx.emit(ThreadEvent::StreamedAssistantText(
1395 last_message.id,
1396 chunk,
1397 ));
1398 } else {
1399 // If we won't have an Assistant message yet, assume this chunk marks the beginning
1400 // of a new Assistant response.
1401 //
1402 // Importantly: We do *not* want to emit a `StreamedAssistantText` event here, as it
1403 // will result in duplicating the text of the chunk in the rendered Markdown.
1404 request_assistant_message_id =
1405 Some(thread.insert_assistant_message(
1406 vec![MessageSegment::Text(chunk.to_string())],
1407 cx,
1408 ));
1409 };
1410 }
1411 }
1412 LanguageModelCompletionEvent::Thinking {
1413 text: chunk,
1414 signature,
1415 } => {
1416 if let Some(last_message) = thread.messages.last_mut() {
1417 if last_message.role == Role::Assistant
1418 && !thread.tool_use.has_tool_results(last_message.id)
1419 {
1420 last_message.push_thinking(&chunk, signature);
1421 cx.emit(ThreadEvent::StreamedAssistantThinking(
1422 last_message.id,
1423 chunk,
1424 ));
1425 } else {
1426 // If we won't have an Assistant message yet, assume this chunk marks the beginning
1427 // of a new Assistant response.
1428 //
1429 // Importantly: We do *not* want to emit a `StreamedAssistantText` event here, as it
1430 // will result in duplicating the text of the chunk in the rendered Markdown.
1431 request_assistant_message_id =
1432 Some(thread.insert_assistant_message(
1433 vec![MessageSegment::Thinking {
1434 text: chunk.to_string(),
1435 signature,
1436 }],
1437 cx,
1438 ));
1439 };
1440 }
1441 }
1442 LanguageModelCompletionEvent::ToolUse(tool_use) => {
1443 let last_assistant_message_id = request_assistant_message_id
1444 .unwrap_or_else(|| {
1445 let new_assistant_message_id =
1446 thread.insert_assistant_message(vec![], cx);
1447 request_assistant_message_id =
1448 Some(new_assistant_message_id);
1449 new_assistant_message_id
1450 });
1451
1452 let tool_use_id = tool_use.id.clone();
1453 let streamed_input = if tool_use.is_input_complete {
1454 None
1455 } else {
1456 Some((&tool_use.input).clone())
1457 };
1458
1459 let ui_text = thread.tool_use.request_tool_use(
1460 last_assistant_message_id,
1461 tool_use,
1462 tool_use_metadata.clone(),
1463 cx,
1464 );
1465
1466 if let Some(input) = streamed_input {
1467 cx.emit(ThreadEvent::StreamedToolUse {
1468 tool_use_id,
1469 ui_text,
1470 input,
1471 });
1472 }
1473 }
1474 }
1475
1476 thread.touch_updated_at();
1477 cx.emit(ThreadEvent::StreamedCompletion);
1478 cx.notify();
1479
1480 thread.auto_capture_telemetry(cx);
1481 Ok(())
1482 })??;
1483
1484 smol::future::yield_now().await;
1485 }
1486
1487 thread.update(cx, |thread, cx| {
1488 thread
1489 .pending_completions
1490 .retain(|completion| completion.id != pending_completion_id);
1491
1492 // If there is a response without tool use, summarize the message. Otherwise,
1493 // allow two tool uses before summarizing.
1494 if thread.summary.is_none()
1495 && thread.messages.len() >= 2
1496 && (!thread.has_pending_tool_uses() || thread.messages.len() >= 6)
1497 {
1498 thread.summarize(cx);
1499 }
1500 })?;
1501
1502 anyhow::Ok(stop_reason)
1503 };
1504
1505 let result = stream_completion.await;
1506
1507 thread
1508 .update(cx, |thread, cx| {
1509 thread.finalize_pending_checkpoint(cx);
1510 match result.as_ref() {
1511 Ok(stop_reason) => match stop_reason {
1512 StopReason::ToolUse => {
1513 let tool_uses = thread.use_pending_tools(window, cx, model.clone());
1514 cx.emit(ThreadEvent::UsePendingTools { tool_uses });
1515 }
1516 StopReason::EndTurn => {}
1517 StopReason::MaxTokens => {}
1518 },
1519 Err(error) => {
1520 if error.is::<PaymentRequiredError>() {
1521 cx.emit(ThreadEvent::ShowError(ThreadError::PaymentRequired));
1522 } else if error.is::<MaxMonthlySpendReachedError>() {
1523 cx.emit(ThreadEvent::ShowError(
1524 ThreadError::MaxMonthlySpendReached,
1525 ));
1526 } else if let Some(error) =
1527 error.downcast_ref::<ModelRequestLimitReachedError>()
1528 {
1529 cx.emit(ThreadEvent::ShowError(
1530 ThreadError::ModelRequestLimitReached { plan: error.plan },
1531 ));
1532 } else if let Some(known_error) =
1533 error.downcast_ref::<LanguageModelKnownError>()
1534 {
1535 match known_error {
1536 LanguageModelKnownError::ContextWindowLimitExceeded {
1537 tokens,
1538 } => {
1539 thread.exceeded_window_error = Some(ExceededWindowError {
1540 model_id: model.id(),
1541 token_count: *tokens,
1542 });
1543 cx.notify();
1544 }
1545 }
1546 } else {
1547 let error_message = error
1548 .chain()
1549 .map(|err| err.to_string())
1550 .collect::<Vec<_>>()
1551 .join("\n");
1552 cx.emit(ThreadEvent::ShowError(ThreadError::Message {
1553 header: "Error interacting with language model".into(),
1554 message: SharedString::from(error_message.clone()),
1555 }));
1556 }
1557
1558 thread.cancel_last_completion(window, cx);
1559 }
1560 }
1561 cx.emit(ThreadEvent::Stopped(result.map_err(Arc::new)));
1562
1563 if let Some((request_callback, (request, response_events))) = thread
1564 .request_callback
1565 .as_mut()
1566 .zip(request_callback_parameters.as_ref())
1567 {
1568 request_callback(request, response_events);
1569 }
1570
1571 thread.auto_capture_telemetry(cx);
1572
1573 if let Ok(initial_usage) = initial_token_usage {
1574 let usage = thread.cumulative_token_usage - initial_usage;
1575
1576 telemetry::event!(
1577 "Assistant Thread Completion",
1578 thread_id = thread.id().to_string(),
1579 prompt_id = prompt_id,
1580 model = model.telemetry_id(),
1581 model_provider = model.provider_id().to_string(),
1582 input_tokens = usage.input_tokens,
1583 output_tokens = usage.output_tokens,
1584 cache_creation_input_tokens = usage.cache_creation_input_tokens,
1585 cache_read_input_tokens = usage.cache_read_input_tokens,
1586 );
1587 }
1588 })
1589 .ok();
1590 });
1591
1592 self.pending_completions.push(PendingCompletion {
1593 id: pending_completion_id,
1594 _task: task,
1595 });
1596 }
1597
1598 pub fn summarize(&mut self, cx: &mut Context<Self>) {
1599 let Some(model) = LanguageModelRegistry::read_global(cx).thread_summary_model() else {
1600 return;
1601 };
1602
1603 if !model.provider.is_authenticated(cx) {
1604 return;
1605 }
1606
1607 let added_user_message = "Generate a concise 3-7 word title for this conversation, omitting punctuation. \
1608 Go straight to the title, without any preamble and prefix like `Here's a concise suggestion:...` or `Title:`. \
1609 If the conversation is about a specific subject, include it in the title. \
1610 Be descriptive. DO NOT speak in the first person.";
1611
1612 let request = self.to_summarize_request(added_user_message.into());
1613
1614 self.pending_summary = cx.spawn(async move |this, cx| {
1615 async move {
1616 let stream = model.model.stream_completion_text_with_usage(request, &cx);
1617 let (mut messages, usage) = stream.await?;
1618
1619 if let Some(usage) = usage {
1620 this.update(cx, |_thread, cx| {
1621 cx.emit(ThreadEvent::UsageUpdated(usage));
1622 })
1623 .ok();
1624 }
1625
1626 let mut new_summary = String::new();
1627 while let Some(message) = messages.stream.next().await {
1628 let text = message?;
1629 let mut lines = text.lines();
1630 new_summary.extend(lines.next());
1631
1632 // Stop if the LLM generated multiple lines.
1633 if lines.next().is_some() {
1634 break;
1635 }
1636 }
1637
1638 this.update(cx, |this, cx| {
1639 if !new_summary.is_empty() {
1640 this.summary = Some(new_summary.into());
1641 }
1642
1643 cx.emit(ThreadEvent::SummaryGenerated);
1644 })?;
1645
1646 anyhow::Ok(())
1647 }
1648 .log_err()
1649 .await
1650 });
1651 }
1652
1653 pub fn start_generating_detailed_summary_if_needed(
1654 &mut self,
1655 thread_store: WeakEntity<ThreadStore>,
1656 cx: &mut Context<Self>,
1657 ) {
1658 let Some(last_message_id) = self.messages.last().map(|message| message.id) else {
1659 return;
1660 };
1661
1662 match &*self.detailed_summary_rx.borrow() {
1663 DetailedSummaryState::Generating { message_id, .. }
1664 | DetailedSummaryState::Generated { message_id, .. }
1665 if *message_id == last_message_id =>
1666 {
1667 // Already up-to-date
1668 return;
1669 }
1670 _ => {}
1671 }
1672
1673 let Some(ConfiguredModel { model, provider }) =
1674 LanguageModelRegistry::read_global(cx).thread_summary_model()
1675 else {
1676 return;
1677 };
1678
1679 if !provider.is_authenticated(cx) {
1680 return;
1681 }
1682
1683 let added_user_message = "Generate a detailed summary of this conversation. Include:\n\
1684 1. A brief overview of what was discussed\n\
1685 2. Key facts or information discovered\n\
1686 3. Outcomes or conclusions reached\n\
1687 4. Any action items or next steps if any\n\
1688 Format it in Markdown with headings and bullet points.";
1689
1690 let request = self.to_summarize_request(added_user_message.into());
1691
1692 *self.detailed_summary_tx.borrow_mut() = DetailedSummaryState::Generating {
1693 message_id: last_message_id,
1694 };
1695
1696 // Replace the detailed summarization task if there is one, cancelling it. It would probably
1697 // be better to allow the old task to complete, but this would require logic for choosing
1698 // which result to prefer (the old task could complete after the new one, resulting in a
1699 // stale summary).
1700 self.detailed_summary_task = cx.spawn(async move |thread, cx| {
1701 let stream = model.stream_completion_text(request, &cx);
1702 let Some(mut messages) = stream.await.log_err() else {
1703 thread
1704 .update(cx, |thread, _cx| {
1705 *thread.detailed_summary_tx.borrow_mut() =
1706 DetailedSummaryState::NotGenerated;
1707 })
1708 .ok()?;
1709 return None;
1710 };
1711
1712 let mut new_detailed_summary = String::new();
1713
1714 while let Some(chunk) = messages.stream.next().await {
1715 if let Some(chunk) = chunk.log_err() {
1716 new_detailed_summary.push_str(&chunk);
1717 }
1718 }
1719
1720 thread
1721 .update(cx, |thread, _cx| {
1722 *thread.detailed_summary_tx.borrow_mut() = DetailedSummaryState::Generated {
1723 text: new_detailed_summary.into(),
1724 message_id: last_message_id,
1725 };
1726 })
1727 .ok()?;
1728
1729 // Save thread so its summary can be reused later
1730 if let Some(thread) = thread.upgrade() {
1731 if let Ok(Ok(save_task)) = cx.update(|cx| {
1732 thread_store
1733 .update(cx, |thread_store, cx| thread_store.save_thread(&thread, cx))
1734 }) {
1735 save_task.await.log_err();
1736 }
1737 }
1738
1739 Some(())
1740 });
1741 }
1742
1743 pub async fn wait_for_detailed_summary_or_text(
1744 this: &Entity<Self>,
1745 cx: &mut AsyncApp,
1746 ) -> Option<SharedString> {
1747 let mut detailed_summary_rx = this
1748 .read_with(cx, |this, _cx| this.detailed_summary_rx.clone())
1749 .ok()?;
1750 loop {
1751 match detailed_summary_rx.recv().await? {
1752 DetailedSummaryState::Generating { .. } => {}
1753 DetailedSummaryState::NotGenerated => {
1754 return this.read_with(cx, |this, _cx| this.text().into()).ok();
1755 }
1756 DetailedSummaryState::Generated { text, .. } => return Some(text),
1757 }
1758 }
1759 }
1760
1761 pub fn latest_detailed_summary_or_text(&self) -> SharedString {
1762 self.detailed_summary_rx
1763 .borrow()
1764 .text()
1765 .unwrap_or_else(|| self.text().into())
1766 }
1767
1768 pub fn is_generating_detailed_summary(&self) -> bool {
1769 matches!(
1770 &*self.detailed_summary_rx.borrow(),
1771 DetailedSummaryState::Generating { .. }
1772 )
1773 }
1774
1775 pub fn use_pending_tools(
1776 &mut self,
1777 window: Option<AnyWindowHandle>,
1778 cx: &mut Context<Self>,
1779 model: Arc<dyn LanguageModel>,
1780 ) -> Vec<PendingToolUse> {
1781 self.auto_capture_telemetry(cx);
1782 let request = self.to_completion_request(model, cx);
1783 let messages = Arc::new(request.messages);
1784 let pending_tool_uses = self
1785 .tool_use
1786 .pending_tool_uses()
1787 .into_iter()
1788 .filter(|tool_use| tool_use.status.is_idle())
1789 .cloned()
1790 .collect::<Vec<_>>();
1791
1792 for tool_use in pending_tool_uses.iter() {
1793 if let Some(tool) = self.tools.read(cx).tool(&tool_use.name, cx) {
1794 if tool.needs_confirmation(&tool_use.input, cx)
1795 && !AssistantSettings::get_global(cx).always_allow_tool_actions
1796 {
1797 self.tool_use.confirm_tool_use(
1798 tool_use.id.clone(),
1799 tool_use.ui_text.clone(),
1800 tool_use.input.clone(),
1801 messages.clone(),
1802 tool,
1803 );
1804 cx.emit(ThreadEvent::ToolConfirmationNeeded);
1805 } else {
1806 self.run_tool(
1807 tool_use.id.clone(),
1808 tool_use.ui_text.clone(),
1809 tool_use.input.clone(),
1810 &messages,
1811 tool,
1812 window,
1813 cx,
1814 );
1815 }
1816 }
1817 }
1818
1819 pending_tool_uses
1820 }
1821
1822 pub fn receive_invalid_tool_json(
1823 &mut self,
1824 tool_use_id: LanguageModelToolUseId,
1825 tool_name: Arc<str>,
1826 invalid_json: Arc<str>,
1827 error: String,
1828 window: Option<AnyWindowHandle>,
1829 cx: &mut Context<Thread>,
1830 ) {
1831 log::error!("The model returned invalid input JSON: {invalid_json}");
1832
1833 let pending_tool_use = self.tool_use.insert_tool_output(
1834 tool_use_id.clone(),
1835 tool_name,
1836 Err(anyhow!("Error parsing input JSON: {error}")),
1837 self.configured_model.as_ref(),
1838 );
1839 let ui_text = if let Some(pending_tool_use) = &pending_tool_use {
1840 pending_tool_use.ui_text.clone()
1841 } else {
1842 log::error!(
1843 "There was no pending tool use for tool use {tool_use_id}, even though it finished (with invalid input JSON)."
1844 );
1845 format!("Unknown tool {}", tool_use_id).into()
1846 };
1847
1848 cx.emit(ThreadEvent::InvalidToolInput {
1849 tool_use_id: tool_use_id.clone(),
1850 ui_text,
1851 invalid_input_json: invalid_json,
1852 });
1853
1854 self.tool_finished(tool_use_id, pending_tool_use, false, window, cx);
1855 }
1856
1857 pub fn run_tool(
1858 &mut self,
1859 tool_use_id: LanguageModelToolUseId,
1860 ui_text: impl Into<SharedString>,
1861 input: serde_json::Value,
1862 messages: &[LanguageModelRequestMessage],
1863 tool: Arc<dyn Tool>,
1864 window: Option<AnyWindowHandle>,
1865 cx: &mut Context<Thread>,
1866 ) {
1867 let task = self.spawn_tool_use(tool_use_id.clone(), messages, input, tool, window, cx);
1868 self.tool_use
1869 .run_pending_tool(tool_use_id, ui_text.into(), task);
1870 }
1871
1872 fn spawn_tool_use(
1873 &mut self,
1874 tool_use_id: LanguageModelToolUseId,
1875 messages: &[LanguageModelRequestMessage],
1876 input: serde_json::Value,
1877 tool: Arc<dyn Tool>,
1878 window: Option<AnyWindowHandle>,
1879 cx: &mut Context<Thread>,
1880 ) -> Task<()> {
1881 let tool_name: Arc<str> = tool.name().into();
1882
1883 let tool_result = if self.tools.read(cx).is_disabled(&tool.source(), &tool_name) {
1884 Task::ready(Err(anyhow!("tool is disabled: {tool_name}"))).into()
1885 } else {
1886 tool.run(
1887 input,
1888 messages,
1889 self.project.clone(),
1890 self.action_log.clone(),
1891 window,
1892 cx,
1893 )
1894 };
1895
1896 // Store the card separately if it exists
1897 if let Some(card) = tool_result.card.clone() {
1898 self.tool_use
1899 .insert_tool_result_card(tool_use_id.clone(), card);
1900 }
1901
1902 cx.spawn({
1903 async move |thread: WeakEntity<Thread>, cx| {
1904 let output = tool_result.output.await;
1905
1906 thread
1907 .update(cx, |thread, cx| {
1908 let pending_tool_use = thread.tool_use.insert_tool_output(
1909 tool_use_id.clone(),
1910 tool_name,
1911 output,
1912 thread.configured_model.as_ref(),
1913 );
1914 thread.tool_finished(tool_use_id, pending_tool_use, false, window, cx);
1915 })
1916 .ok();
1917 }
1918 })
1919 }
1920
1921 fn tool_finished(
1922 &mut self,
1923 tool_use_id: LanguageModelToolUseId,
1924 pending_tool_use: Option<PendingToolUse>,
1925 canceled: bool,
1926 window: Option<AnyWindowHandle>,
1927 cx: &mut Context<Self>,
1928 ) {
1929 if self.all_tools_finished() {
1930 if let Some(ConfiguredModel { model, .. }) = self.configured_model.as_ref() {
1931 if !canceled {
1932 self.send_to_model(model.clone(), window, cx);
1933 }
1934 self.auto_capture_telemetry(cx);
1935 }
1936 }
1937
1938 cx.emit(ThreadEvent::ToolFinished {
1939 tool_use_id,
1940 pending_tool_use,
1941 });
1942 }
1943
1944 /// Cancels the last pending completion, if there are any pending.
1945 ///
1946 /// Returns whether a completion was canceled.
1947 pub fn cancel_last_completion(
1948 &mut self,
1949 window: Option<AnyWindowHandle>,
1950 cx: &mut Context<Self>,
1951 ) -> bool {
1952 let mut canceled = self.pending_completions.pop().is_some();
1953
1954 for pending_tool_use in self.tool_use.cancel_pending() {
1955 canceled = true;
1956 self.tool_finished(
1957 pending_tool_use.id.clone(),
1958 Some(pending_tool_use),
1959 true,
1960 window,
1961 cx,
1962 );
1963 }
1964
1965 self.finalize_pending_checkpoint(cx);
1966
1967 if canceled {
1968 cx.emit(ThreadEvent::CompletionCanceled);
1969 }
1970
1971 canceled
1972 }
1973
1974 /// Signals that any in-progress editing should be canceled.
1975 ///
1976 /// This method is used to notify listeners (like ActiveThread) that
1977 /// they should cancel any editing operations.
1978 pub fn cancel_editing(&mut self, cx: &mut Context<Self>) {
1979 cx.emit(ThreadEvent::CancelEditing);
1980 }
1981
1982 pub fn feedback(&self) -> Option<ThreadFeedback> {
1983 self.feedback
1984 }
1985
1986 pub fn message_feedback(&self, message_id: MessageId) -> Option<ThreadFeedback> {
1987 self.message_feedback.get(&message_id).copied()
1988 }
1989
1990 pub fn report_message_feedback(
1991 &mut self,
1992 message_id: MessageId,
1993 feedback: ThreadFeedback,
1994 cx: &mut Context<Self>,
1995 ) -> Task<Result<()>> {
1996 if self.message_feedback.get(&message_id) == Some(&feedback) {
1997 return Task::ready(Ok(()));
1998 }
1999
2000 let final_project_snapshot = Self::project_snapshot(self.project.clone(), cx);
2001 let serialized_thread = self.serialize(cx);
2002 let thread_id = self.id().clone();
2003 let client = self.project.read(cx).client();
2004
2005 let enabled_tool_names: Vec<String> = self
2006 .tools()
2007 .read(cx)
2008 .enabled_tools(cx)
2009 .iter()
2010 .map(|tool| tool.name().to_string())
2011 .collect();
2012
2013 self.message_feedback.insert(message_id, feedback);
2014
2015 cx.notify();
2016
2017 let message_content = self
2018 .message(message_id)
2019 .map(|msg| msg.to_string())
2020 .unwrap_or_default();
2021
2022 cx.background_spawn(async move {
2023 let final_project_snapshot = final_project_snapshot.await;
2024 let serialized_thread = serialized_thread.await?;
2025 let thread_data =
2026 serde_json::to_value(serialized_thread).unwrap_or_else(|_| serde_json::Value::Null);
2027
2028 let rating = match feedback {
2029 ThreadFeedback::Positive => "positive",
2030 ThreadFeedback::Negative => "negative",
2031 };
2032 telemetry::event!(
2033 "Assistant Thread Rated",
2034 rating,
2035 thread_id,
2036 enabled_tool_names,
2037 message_id = message_id.0,
2038 message_content,
2039 thread_data,
2040 final_project_snapshot
2041 );
2042 client.telemetry().flush_events().await;
2043
2044 Ok(())
2045 })
2046 }
2047
2048 pub fn report_feedback(
2049 &mut self,
2050 feedback: ThreadFeedback,
2051 cx: &mut Context<Self>,
2052 ) -> Task<Result<()>> {
2053 let last_assistant_message_id = self
2054 .messages
2055 .iter()
2056 .rev()
2057 .find(|msg| msg.role == Role::Assistant)
2058 .map(|msg| msg.id);
2059
2060 if let Some(message_id) = last_assistant_message_id {
2061 self.report_message_feedback(message_id, feedback, cx)
2062 } else {
2063 let final_project_snapshot = Self::project_snapshot(self.project.clone(), cx);
2064 let serialized_thread = self.serialize(cx);
2065 let thread_id = self.id().clone();
2066 let client = self.project.read(cx).client();
2067 self.feedback = Some(feedback);
2068 cx.notify();
2069
2070 cx.background_spawn(async move {
2071 let final_project_snapshot = final_project_snapshot.await;
2072 let serialized_thread = serialized_thread.await?;
2073 let thread_data = serde_json::to_value(serialized_thread)
2074 .unwrap_or_else(|_| serde_json::Value::Null);
2075
2076 let rating = match feedback {
2077 ThreadFeedback::Positive => "positive",
2078 ThreadFeedback::Negative => "negative",
2079 };
2080 telemetry::event!(
2081 "Assistant Thread Rated",
2082 rating,
2083 thread_id,
2084 thread_data,
2085 final_project_snapshot
2086 );
2087 client.telemetry().flush_events().await;
2088
2089 Ok(())
2090 })
2091 }
2092 }
2093
2094 /// Create a snapshot of the current project state including git information and unsaved buffers.
2095 fn project_snapshot(
2096 project: Entity<Project>,
2097 cx: &mut Context<Self>,
2098 ) -> Task<Arc<ProjectSnapshot>> {
2099 let git_store = project.read(cx).git_store().clone();
2100 let worktree_snapshots: Vec<_> = project
2101 .read(cx)
2102 .visible_worktrees(cx)
2103 .map(|worktree| Self::worktree_snapshot(worktree, git_store.clone(), cx))
2104 .collect();
2105
2106 cx.spawn(async move |_, cx| {
2107 let worktree_snapshots = futures::future::join_all(worktree_snapshots).await;
2108
2109 let mut unsaved_buffers = Vec::new();
2110 cx.update(|app_cx| {
2111 let buffer_store = project.read(app_cx).buffer_store();
2112 for buffer_handle in buffer_store.read(app_cx).buffers() {
2113 let buffer = buffer_handle.read(app_cx);
2114 if buffer.is_dirty() {
2115 if let Some(file) = buffer.file() {
2116 let path = file.path().to_string_lossy().to_string();
2117 unsaved_buffers.push(path);
2118 }
2119 }
2120 }
2121 })
2122 .ok();
2123
2124 Arc::new(ProjectSnapshot {
2125 worktree_snapshots,
2126 unsaved_buffer_paths: unsaved_buffers,
2127 timestamp: Utc::now(),
2128 })
2129 })
2130 }
2131
2132 fn worktree_snapshot(
2133 worktree: Entity<project::Worktree>,
2134 git_store: Entity<GitStore>,
2135 cx: &App,
2136 ) -> Task<WorktreeSnapshot> {
2137 cx.spawn(async move |cx| {
2138 // Get worktree path and snapshot
2139 let worktree_info = cx.update(|app_cx| {
2140 let worktree = worktree.read(app_cx);
2141 let path = worktree.abs_path().to_string_lossy().to_string();
2142 let snapshot = worktree.snapshot();
2143 (path, snapshot)
2144 });
2145
2146 let Ok((worktree_path, _snapshot)) = worktree_info else {
2147 return WorktreeSnapshot {
2148 worktree_path: String::new(),
2149 git_state: None,
2150 };
2151 };
2152
2153 let git_state = git_store
2154 .update(cx, |git_store, cx| {
2155 git_store
2156 .repositories()
2157 .values()
2158 .find(|repo| {
2159 repo.read(cx)
2160 .abs_path_to_repo_path(&worktree.read(cx).abs_path())
2161 .is_some()
2162 })
2163 .cloned()
2164 })
2165 .ok()
2166 .flatten()
2167 .map(|repo| {
2168 repo.update(cx, |repo, _| {
2169 let current_branch =
2170 repo.branch.as_ref().map(|branch| branch.name.to_string());
2171 repo.send_job(None, |state, _| async move {
2172 let RepositoryState::Local { backend, .. } = state else {
2173 return GitState {
2174 remote_url: None,
2175 head_sha: None,
2176 current_branch,
2177 diff: None,
2178 };
2179 };
2180
2181 let remote_url = backend.remote_url("origin");
2182 let head_sha = backend.head_sha().await;
2183 let diff = backend.diff(DiffType::HeadToWorktree).await.ok();
2184
2185 GitState {
2186 remote_url,
2187 head_sha,
2188 current_branch,
2189 diff,
2190 }
2191 })
2192 })
2193 });
2194
2195 let git_state = match git_state {
2196 Some(git_state) => match git_state.ok() {
2197 Some(git_state) => git_state.await.ok(),
2198 None => None,
2199 },
2200 None => None,
2201 };
2202
2203 WorktreeSnapshot {
2204 worktree_path,
2205 git_state,
2206 }
2207 })
2208 }
2209
2210 pub fn to_markdown(&self, cx: &App) -> Result<String> {
2211 let mut markdown = Vec::new();
2212
2213 if let Some(summary) = self.summary() {
2214 writeln!(markdown, "# {summary}\n")?;
2215 };
2216
2217 for message in self.messages() {
2218 writeln!(
2219 markdown,
2220 "## {role}\n",
2221 role = match message.role {
2222 Role::User => "User",
2223 Role::Assistant => "Assistant",
2224 Role::System => "System",
2225 }
2226 )?;
2227
2228 if !message.loaded_context.text.is_empty() {
2229 writeln!(markdown, "{}", message.loaded_context.text)?;
2230 }
2231
2232 if !message.loaded_context.images.is_empty() {
2233 writeln!(
2234 markdown,
2235 "\n{} images attached as context.\n",
2236 message.loaded_context.images.len()
2237 )?;
2238 }
2239
2240 for segment in &message.segments {
2241 match segment {
2242 MessageSegment::Text(text) => writeln!(markdown, "{}\n", text)?,
2243 MessageSegment::Thinking { text, .. } => {
2244 writeln!(markdown, "<think>\n{}\n</think>\n", text)?
2245 }
2246 MessageSegment::RedactedThinking(_) => {}
2247 }
2248 }
2249
2250 for tool_use in self.tool_uses_for_message(message.id, cx) {
2251 writeln!(
2252 markdown,
2253 "**Use Tool: {} ({})**",
2254 tool_use.name, tool_use.id
2255 )?;
2256 writeln!(markdown, "```json")?;
2257 writeln!(
2258 markdown,
2259 "{}",
2260 serde_json::to_string_pretty(&tool_use.input)?
2261 )?;
2262 writeln!(markdown, "```")?;
2263 }
2264
2265 for tool_result in self.tool_results_for_message(message.id) {
2266 write!(markdown, "\n**Tool Results: {}", tool_result.tool_use_id)?;
2267 if tool_result.is_error {
2268 write!(markdown, " (Error)")?;
2269 }
2270
2271 writeln!(markdown, "**\n")?;
2272 writeln!(markdown, "{}", tool_result.content)?;
2273 }
2274 }
2275
2276 Ok(String::from_utf8_lossy(&markdown).to_string())
2277 }
2278
2279 pub fn keep_edits_in_range(
2280 &mut self,
2281 buffer: Entity<language::Buffer>,
2282 buffer_range: Range<language::Anchor>,
2283 cx: &mut Context<Self>,
2284 ) {
2285 self.action_log.update(cx, |action_log, cx| {
2286 action_log.keep_edits_in_range(buffer, buffer_range, cx)
2287 });
2288 }
2289
2290 pub fn keep_all_edits(&mut self, cx: &mut Context<Self>) {
2291 self.action_log
2292 .update(cx, |action_log, cx| action_log.keep_all_edits(cx));
2293 }
2294
2295 pub fn reject_edits_in_ranges(
2296 &mut self,
2297 buffer: Entity<language::Buffer>,
2298 buffer_ranges: Vec<Range<language::Anchor>>,
2299 cx: &mut Context<Self>,
2300 ) -> Task<Result<()>> {
2301 self.action_log.update(cx, |action_log, cx| {
2302 action_log.reject_edits_in_ranges(buffer, buffer_ranges, cx)
2303 })
2304 }
2305
2306 pub fn action_log(&self) -> &Entity<ActionLog> {
2307 &self.action_log
2308 }
2309
2310 pub fn project(&self) -> &Entity<Project> {
2311 &self.project
2312 }
2313
2314 pub fn auto_capture_telemetry(&mut self, cx: &mut Context<Self>) {
2315 if !cx.has_flag::<feature_flags::ThreadAutoCaptureFeatureFlag>() {
2316 return;
2317 }
2318
2319 let now = Instant::now();
2320 if let Some(last) = self.last_auto_capture_at {
2321 if now.duration_since(last).as_secs() < 10 {
2322 return;
2323 }
2324 }
2325
2326 self.last_auto_capture_at = Some(now);
2327
2328 let thread_id = self.id().clone();
2329 let github_login = self
2330 .project
2331 .read(cx)
2332 .user_store()
2333 .read(cx)
2334 .current_user()
2335 .map(|user| user.github_login.clone());
2336 let client = self.project.read(cx).client().clone();
2337 let serialize_task = self.serialize(cx);
2338
2339 cx.background_executor()
2340 .spawn(async move {
2341 if let Ok(serialized_thread) = serialize_task.await {
2342 if let Ok(thread_data) = serde_json::to_value(serialized_thread) {
2343 telemetry::event!(
2344 "Agent Thread Auto-Captured",
2345 thread_id = thread_id.to_string(),
2346 thread_data = thread_data,
2347 auto_capture_reason = "tracked_user",
2348 github_login = github_login
2349 );
2350
2351 client.telemetry().flush_events().await;
2352 }
2353 }
2354 })
2355 .detach();
2356 }
2357
2358 pub fn cumulative_token_usage(&self) -> TokenUsage {
2359 self.cumulative_token_usage
2360 }
2361
2362 pub fn token_usage_up_to_message(&self, message_id: MessageId) -> TotalTokenUsage {
2363 let Some(model) = self.configured_model.as_ref() else {
2364 return TotalTokenUsage::default();
2365 };
2366
2367 let max = model.model.max_token_count();
2368
2369 let index = self
2370 .messages
2371 .iter()
2372 .position(|msg| msg.id == message_id)
2373 .unwrap_or(0);
2374
2375 if index == 0 {
2376 return TotalTokenUsage { total: 0, max };
2377 }
2378
2379 let token_usage = &self
2380 .request_token_usage
2381 .get(index - 1)
2382 .cloned()
2383 .unwrap_or_default();
2384
2385 TotalTokenUsage {
2386 total: token_usage.total_tokens() as usize,
2387 max,
2388 }
2389 }
2390
2391 pub fn total_token_usage(&self) -> Option<TotalTokenUsage> {
2392 let model = self.configured_model.as_ref()?;
2393
2394 let max = model.model.max_token_count();
2395
2396 if let Some(exceeded_error) = &self.exceeded_window_error {
2397 if model.model.id() == exceeded_error.model_id {
2398 return Some(TotalTokenUsage {
2399 total: exceeded_error.token_count,
2400 max,
2401 });
2402 }
2403 }
2404
2405 let total = self
2406 .token_usage_at_last_message()
2407 .unwrap_or_default()
2408 .total_tokens() as usize;
2409
2410 Some(TotalTokenUsage { total, max })
2411 }
2412
2413 fn token_usage_at_last_message(&self) -> Option<TokenUsage> {
2414 self.request_token_usage
2415 .get(self.messages.len().saturating_sub(1))
2416 .or_else(|| self.request_token_usage.last())
2417 .cloned()
2418 }
2419
2420 fn update_token_usage_at_last_message(&mut self, token_usage: TokenUsage) {
2421 let placeholder = self.token_usage_at_last_message().unwrap_or_default();
2422 self.request_token_usage
2423 .resize(self.messages.len(), placeholder);
2424
2425 if let Some(last) = self.request_token_usage.last_mut() {
2426 *last = token_usage;
2427 }
2428 }
2429
2430 pub fn deny_tool_use(
2431 &mut self,
2432 tool_use_id: LanguageModelToolUseId,
2433 tool_name: Arc<str>,
2434 window: Option<AnyWindowHandle>,
2435 cx: &mut Context<Self>,
2436 ) {
2437 let err = Err(anyhow::anyhow!(
2438 "Permission to run tool action denied by user"
2439 ));
2440
2441 self.tool_use.insert_tool_output(
2442 tool_use_id.clone(),
2443 tool_name,
2444 err,
2445 self.configured_model.as_ref(),
2446 );
2447 self.tool_finished(tool_use_id.clone(), None, true, window, cx);
2448 }
2449}
2450
2451#[derive(Debug, Clone, Error)]
2452pub enum ThreadError {
2453 #[error("Payment required")]
2454 PaymentRequired,
2455 #[error("Max monthly spend reached")]
2456 MaxMonthlySpendReached,
2457 #[error("Model request limit reached")]
2458 ModelRequestLimitReached { plan: Plan },
2459 #[error("Message {header}: {message}")]
2460 Message {
2461 header: SharedString,
2462 message: SharedString,
2463 },
2464}
2465
2466#[derive(Debug, Clone)]
2467pub enum ThreadEvent {
2468 ShowError(ThreadError),
2469 UsageUpdated(RequestUsage),
2470 StreamedCompletion,
2471 ReceivedTextChunk,
2472 NewRequest,
2473 StreamedAssistantText(MessageId, String),
2474 StreamedAssistantThinking(MessageId, String),
2475 StreamedToolUse {
2476 tool_use_id: LanguageModelToolUseId,
2477 ui_text: Arc<str>,
2478 input: serde_json::Value,
2479 },
2480 InvalidToolInput {
2481 tool_use_id: LanguageModelToolUseId,
2482 ui_text: Arc<str>,
2483 invalid_input_json: Arc<str>,
2484 },
2485 Stopped(Result<StopReason, Arc<anyhow::Error>>),
2486 MessageAdded(MessageId),
2487 MessageEdited(MessageId),
2488 MessageDeleted(MessageId),
2489 SummaryGenerated,
2490 SummaryChanged,
2491 UsePendingTools {
2492 tool_uses: Vec<PendingToolUse>,
2493 },
2494 ToolFinished {
2495 #[allow(unused)]
2496 tool_use_id: LanguageModelToolUseId,
2497 /// The pending tool use that corresponds to this tool.
2498 pending_tool_use: Option<PendingToolUse>,
2499 },
2500 CheckpointChanged,
2501 ToolConfirmationNeeded,
2502 CancelEditing,
2503 CompletionCanceled,
2504}
2505
2506impl EventEmitter<ThreadEvent> for Thread {}
2507
2508struct PendingCompletion {
2509 id: usize,
2510 _task: Task<()>,
2511}
2512
2513#[cfg(test)]
2514mod tests {
2515 use super::*;
2516 use crate::{ThreadStore, context::load_context, context_store::ContextStore, thread_store};
2517 use assistant_settings::AssistantSettings;
2518 use assistant_tool::ToolRegistry;
2519 use context_server::ContextServerSettings;
2520 use editor::EditorSettings;
2521 use gpui::TestAppContext;
2522 use language_model::fake_provider::FakeLanguageModel;
2523 use project::{FakeFs, Project};
2524 use prompt_store::PromptBuilder;
2525 use serde_json::json;
2526 use settings::{Settings, SettingsStore};
2527 use std::sync::Arc;
2528 use theme::ThemeSettings;
2529 use util::path;
2530 use workspace::Workspace;
2531
2532 #[gpui::test]
2533 async fn test_message_with_context(cx: &mut TestAppContext) {
2534 init_test_settings(cx);
2535
2536 let project = create_test_project(
2537 cx,
2538 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2539 )
2540 .await;
2541
2542 let (_workspace, _thread_store, thread, context_store, model) =
2543 setup_test_environment(cx, project.clone()).await;
2544
2545 add_file_to_context(&project, &context_store, "test/code.rs", cx)
2546 .await
2547 .unwrap();
2548
2549 let context = context_store.update(cx, |store, _| store.context().next().cloned().unwrap());
2550 let loaded_context = cx
2551 .update(|cx| load_context(vec![context], &project, &None, cx))
2552 .await;
2553
2554 // Insert user message with context
2555 let message_id = thread.update(cx, |thread, cx| {
2556 thread.insert_user_message(
2557 "Please explain this code",
2558 loaded_context,
2559 None,
2560 Vec::new(),
2561 cx,
2562 )
2563 });
2564
2565 // Check content and context in message object
2566 let message = thread.read_with(cx, |thread, _| thread.message(message_id).unwrap().clone());
2567
2568 // Use different path format strings based on platform for the test
2569 #[cfg(windows)]
2570 let path_part = r"test\code.rs";
2571 #[cfg(not(windows))]
2572 let path_part = "test/code.rs";
2573
2574 let expected_context = format!(
2575 r#"
2576<context>
2577The following items were attached by the user. You don't need to use other tools to read them.
2578
2579<files>
2580```rs {path_part}
2581fn main() {{
2582 println!("Hello, world!");
2583}}
2584```
2585</files>
2586</context>
2587"#
2588 );
2589
2590 assert_eq!(message.role, Role::User);
2591 assert_eq!(message.segments.len(), 1);
2592 assert_eq!(
2593 message.segments[0],
2594 MessageSegment::Text("Please explain this code".to_string())
2595 );
2596 assert_eq!(message.loaded_context.text, expected_context);
2597
2598 // Check message in request
2599 let request = thread.update(cx, |thread, cx| {
2600 thread.to_completion_request(model.clone(), cx)
2601 });
2602
2603 assert_eq!(request.messages.len(), 2);
2604 let expected_full_message = format!("{}Please explain this code", expected_context);
2605 assert_eq!(request.messages[1].string_contents(), expected_full_message);
2606 }
2607
2608 #[gpui::test]
2609 async fn test_only_include_new_contexts(cx: &mut TestAppContext) {
2610 init_test_settings(cx);
2611
2612 let project = create_test_project(
2613 cx,
2614 json!({
2615 "file1.rs": "fn function1() {}\n",
2616 "file2.rs": "fn function2() {}\n",
2617 "file3.rs": "fn function3() {}\n",
2618 "file4.rs": "fn function4() {}\n",
2619 }),
2620 )
2621 .await;
2622
2623 let (_, _thread_store, thread, context_store, model) =
2624 setup_test_environment(cx, project.clone()).await;
2625
2626 // First message with context 1
2627 add_file_to_context(&project, &context_store, "test/file1.rs", cx)
2628 .await
2629 .unwrap();
2630 let new_contexts = context_store.update(cx, |store, cx| {
2631 store.new_context_for_thread(thread.read(cx), None)
2632 });
2633 assert_eq!(new_contexts.len(), 1);
2634 let loaded_context = cx
2635 .update(|cx| load_context(new_contexts, &project, &None, cx))
2636 .await;
2637 let message1_id = thread.update(cx, |thread, cx| {
2638 thread.insert_user_message("Message 1", loaded_context, None, Vec::new(), cx)
2639 });
2640
2641 // Second message with contexts 1 and 2 (context 1 should be skipped as it's already included)
2642 add_file_to_context(&project, &context_store, "test/file2.rs", cx)
2643 .await
2644 .unwrap();
2645 let new_contexts = context_store.update(cx, |store, cx| {
2646 store.new_context_for_thread(thread.read(cx), None)
2647 });
2648 assert_eq!(new_contexts.len(), 1);
2649 let loaded_context = cx
2650 .update(|cx| load_context(new_contexts, &project, &None, cx))
2651 .await;
2652 let message2_id = thread.update(cx, |thread, cx| {
2653 thread.insert_user_message("Message 2", loaded_context, None, Vec::new(), cx)
2654 });
2655
2656 // Third message with all three contexts (contexts 1 and 2 should be skipped)
2657 //
2658 add_file_to_context(&project, &context_store, "test/file3.rs", cx)
2659 .await
2660 .unwrap();
2661 let new_contexts = context_store.update(cx, |store, cx| {
2662 store.new_context_for_thread(thread.read(cx), None)
2663 });
2664 assert_eq!(new_contexts.len(), 1);
2665 let loaded_context = cx
2666 .update(|cx| load_context(new_contexts, &project, &None, cx))
2667 .await;
2668 let message3_id = thread.update(cx, |thread, cx| {
2669 thread.insert_user_message("Message 3", loaded_context, None, Vec::new(), cx)
2670 });
2671
2672 // Check what contexts are included in each message
2673 let (message1, message2, message3) = thread.read_with(cx, |thread, _| {
2674 (
2675 thread.message(message1_id).unwrap().clone(),
2676 thread.message(message2_id).unwrap().clone(),
2677 thread.message(message3_id).unwrap().clone(),
2678 )
2679 });
2680
2681 // First message should include context 1
2682 assert!(message1.loaded_context.text.contains("file1.rs"));
2683
2684 // Second message should include only context 2 (not 1)
2685 assert!(!message2.loaded_context.text.contains("file1.rs"));
2686 assert!(message2.loaded_context.text.contains("file2.rs"));
2687
2688 // Third message should include only context 3 (not 1 or 2)
2689 assert!(!message3.loaded_context.text.contains("file1.rs"));
2690 assert!(!message3.loaded_context.text.contains("file2.rs"));
2691 assert!(message3.loaded_context.text.contains("file3.rs"));
2692
2693 // Check entire request to make sure all contexts are properly included
2694 let request = thread.update(cx, |thread, cx| {
2695 thread.to_completion_request(model.clone(), cx)
2696 });
2697
2698 // The request should contain all 3 messages
2699 assert_eq!(request.messages.len(), 4);
2700
2701 // Check that the contexts are properly formatted in each message
2702 assert!(request.messages[1].string_contents().contains("file1.rs"));
2703 assert!(!request.messages[1].string_contents().contains("file2.rs"));
2704 assert!(!request.messages[1].string_contents().contains("file3.rs"));
2705
2706 assert!(!request.messages[2].string_contents().contains("file1.rs"));
2707 assert!(request.messages[2].string_contents().contains("file2.rs"));
2708 assert!(!request.messages[2].string_contents().contains("file3.rs"));
2709
2710 assert!(!request.messages[3].string_contents().contains("file1.rs"));
2711 assert!(!request.messages[3].string_contents().contains("file2.rs"));
2712 assert!(request.messages[3].string_contents().contains("file3.rs"));
2713
2714 add_file_to_context(&project, &context_store, "test/file4.rs", cx)
2715 .await
2716 .unwrap();
2717 let new_contexts = context_store.update(cx, |store, cx| {
2718 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2719 });
2720 assert_eq!(new_contexts.len(), 3);
2721 let loaded_context = cx
2722 .update(|cx| load_context(new_contexts, &project, &None, cx))
2723 .await
2724 .loaded_context;
2725
2726 assert!(!loaded_context.text.contains("file1.rs"));
2727 assert!(loaded_context.text.contains("file2.rs"));
2728 assert!(loaded_context.text.contains("file3.rs"));
2729 assert!(loaded_context.text.contains("file4.rs"));
2730
2731 let new_contexts = context_store.update(cx, |store, cx| {
2732 // Remove file4.rs
2733 store.remove_context(&loaded_context.contexts[2].handle(), cx);
2734 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2735 });
2736 assert_eq!(new_contexts.len(), 2);
2737 let loaded_context = cx
2738 .update(|cx| load_context(new_contexts, &project, &None, cx))
2739 .await
2740 .loaded_context;
2741
2742 assert!(!loaded_context.text.contains("file1.rs"));
2743 assert!(loaded_context.text.contains("file2.rs"));
2744 assert!(loaded_context.text.contains("file3.rs"));
2745 assert!(!loaded_context.text.contains("file4.rs"));
2746
2747 let new_contexts = context_store.update(cx, |store, cx| {
2748 // Remove file3.rs
2749 store.remove_context(&loaded_context.contexts[1].handle(), cx);
2750 store.new_context_for_thread(thread.read(cx), Some(message2_id))
2751 });
2752 assert_eq!(new_contexts.len(), 1);
2753 let loaded_context = cx
2754 .update(|cx| load_context(new_contexts, &project, &None, cx))
2755 .await
2756 .loaded_context;
2757
2758 assert!(!loaded_context.text.contains("file1.rs"));
2759 assert!(loaded_context.text.contains("file2.rs"));
2760 assert!(!loaded_context.text.contains("file3.rs"));
2761 assert!(!loaded_context.text.contains("file4.rs"));
2762 }
2763
2764 #[gpui::test]
2765 async fn test_message_without_files(cx: &mut TestAppContext) {
2766 init_test_settings(cx);
2767
2768 let project = create_test_project(
2769 cx,
2770 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2771 )
2772 .await;
2773
2774 let (_, _thread_store, thread, _context_store, model) =
2775 setup_test_environment(cx, project.clone()).await;
2776
2777 // Insert user message without any context (empty context vector)
2778 let message_id = thread.update(cx, |thread, cx| {
2779 thread.insert_user_message(
2780 "What is the best way to learn Rust?",
2781 ContextLoadResult::default(),
2782 None,
2783 Vec::new(),
2784 cx,
2785 )
2786 });
2787
2788 // Check content and context in message object
2789 let message = thread.read_with(cx, |thread, _| thread.message(message_id).unwrap().clone());
2790
2791 // Context should be empty when no files are included
2792 assert_eq!(message.role, Role::User);
2793 assert_eq!(message.segments.len(), 1);
2794 assert_eq!(
2795 message.segments[0],
2796 MessageSegment::Text("What is the best way to learn Rust?".to_string())
2797 );
2798 assert_eq!(message.loaded_context.text, "");
2799
2800 // Check message in request
2801 let request = thread.update(cx, |thread, cx| {
2802 thread.to_completion_request(model.clone(), cx)
2803 });
2804
2805 assert_eq!(request.messages.len(), 2);
2806 assert_eq!(
2807 request.messages[1].string_contents(),
2808 "What is the best way to learn Rust?"
2809 );
2810
2811 // Add second message, also without context
2812 let message2_id = thread.update(cx, |thread, cx| {
2813 thread.insert_user_message(
2814 "Are there any good books?",
2815 ContextLoadResult::default(),
2816 None,
2817 Vec::new(),
2818 cx,
2819 )
2820 });
2821
2822 let message2 =
2823 thread.read_with(cx, |thread, _| thread.message(message2_id).unwrap().clone());
2824 assert_eq!(message2.loaded_context.text, "");
2825
2826 // Check that both messages appear in the request
2827 let request = thread.update(cx, |thread, cx| {
2828 thread.to_completion_request(model.clone(), cx)
2829 });
2830
2831 assert_eq!(request.messages.len(), 3);
2832 assert_eq!(
2833 request.messages[1].string_contents(),
2834 "What is the best way to learn Rust?"
2835 );
2836 assert_eq!(
2837 request.messages[2].string_contents(),
2838 "Are there any good books?"
2839 );
2840 }
2841
2842 #[gpui::test]
2843 async fn test_stale_buffer_notification(cx: &mut TestAppContext) {
2844 init_test_settings(cx);
2845
2846 let project = create_test_project(
2847 cx,
2848 json!({"code.rs": "fn main() {\n println!(\"Hello, world!\");\n}"}),
2849 )
2850 .await;
2851
2852 let (_workspace, _thread_store, thread, context_store, model) =
2853 setup_test_environment(cx, project.clone()).await;
2854
2855 // Open buffer and add it to context
2856 let buffer = add_file_to_context(&project, &context_store, "test/code.rs", cx)
2857 .await
2858 .unwrap();
2859
2860 let context = context_store.update(cx, |store, _| store.context().next().cloned().unwrap());
2861 let loaded_context = cx
2862 .update(|cx| load_context(vec![context], &project, &None, cx))
2863 .await;
2864
2865 // Insert user message with the buffer as context
2866 thread.update(cx, |thread, cx| {
2867 thread.insert_user_message("Explain this code", loaded_context, None, Vec::new(), cx)
2868 });
2869
2870 // Create a request and check that it doesn't have a stale buffer warning yet
2871 let initial_request = thread.update(cx, |thread, cx| {
2872 thread.to_completion_request(model.clone(), cx)
2873 });
2874
2875 // Make sure we don't have a stale file warning yet
2876 let has_stale_warning = initial_request.messages.iter().any(|msg| {
2877 msg.string_contents()
2878 .contains("These files changed since last read:")
2879 });
2880 assert!(
2881 !has_stale_warning,
2882 "Should not have stale buffer warning before buffer is modified"
2883 );
2884
2885 // Modify the buffer
2886 buffer.update(cx, |buffer, cx| {
2887 // Find a position at the end of line 1
2888 buffer.edit(
2889 [(1..1, "\n println!(\"Added a new line\");\n")],
2890 None,
2891 cx,
2892 );
2893 });
2894
2895 // Insert another user message without context
2896 thread.update(cx, |thread, cx| {
2897 thread.insert_user_message(
2898 "What does the code do now?",
2899 ContextLoadResult::default(),
2900 None,
2901 Vec::new(),
2902 cx,
2903 )
2904 });
2905
2906 // Create a new request and check for the stale buffer warning
2907 let new_request = thread.update(cx, |thread, cx| {
2908 thread.to_completion_request(model.clone(), cx)
2909 });
2910
2911 // We should have a stale file warning as the last message
2912 let last_message = new_request
2913 .messages
2914 .last()
2915 .expect("Request should have messages");
2916
2917 // The last message should be the stale buffer notification
2918 assert_eq!(last_message.role, Role::User);
2919
2920 // Check the exact content of the message
2921 let expected_content = "These files changed since last read:\n- code.rs\n";
2922 assert_eq!(
2923 last_message.string_contents(),
2924 expected_content,
2925 "Last message should be exactly the stale buffer notification"
2926 );
2927 }
2928
2929 fn init_test_settings(cx: &mut TestAppContext) {
2930 cx.update(|cx| {
2931 let settings_store = SettingsStore::test(cx);
2932 cx.set_global(settings_store);
2933 language::init(cx);
2934 Project::init_settings(cx);
2935 AssistantSettings::register(cx);
2936 prompt_store::init(cx);
2937 thread_store::init(cx);
2938 workspace::init_settings(cx);
2939 language_model::init_settings(cx);
2940 ThemeSettings::register(cx);
2941 ContextServerSettings::register(cx);
2942 EditorSettings::register(cx);
2943 ToolRegistry::default_global(cx);
2944 });
2945 }
2946
2947 // Helper to create a test project with test files
2948 async fn create_test_project(
2949 cx: &mut TestAppContext,
2950 files: serde_json::Value,
2951 ) -> Entity<Project> {
2952 let fs = FakeFs::new(cx.executor());
2953 fs.insert_tree(path!("/test"), files).await;
2954 Project::test(fs, [path!("/test").as_ref()], cx).await
2955 }
2956
2957 async fn setup_test_environment(
2958 cx: &mut TestAppContext,
2959 project: Entity<Project>,
2960 ) -> (
2961 Entity<Workspace>,
2962 Entity<ThreadStore>,
2963 Entity<Thread>,
2964 Entity<ContextStore>,
2965 Arc<dyn LanguageModel>,
2966 ) {
2967 let (workspace, cx) =
2968 cx.add_window_view(|window, cx| Workspace::test_new(project.clone(), window, cx));
2969
2970 let thread_store = cx
2971 .update(|_, cx| {
2972 ThreadStore::load(
2973 project.clone(),
2974 cx.new(|_| ToolWorkingSet::default()),
2975 None,
2976 Arc::new(PromptBuilder::new(None).unwrap()),
2977 cx,
2978 )
2979 })
2980 .await
2981 .unwrap();
2982
2983 let thread = thread_store.update(cx, |store, cx| store.create_thread(cx));
2984 let context_store = cx.new(|_cx| ContextStore::new(project.downgrade(), None));
2985
2986 let model = FakeLanguageModel::default();
2987 let model: Arc<dyn LanguageModel> = Arc::new(model);
2988
2989 (workspace, thread_store, thread, context_store, model)
2990 }
2991
2992 async fn add_file_to_context(
2993 project: &Entity<Project>,
2994 context_store: &Entity<ContextStore>,
2995 path: &str,
2996 cx: &mut TestAppContext,
2997 ) -> Result<Entity<language::Buffer>> {
2998 let buffer_path = project
2999 .read_with(cx, |project, cx| project.find_project_path(path, cx))
3000 .unwrap();
3001
3002 let buffer = project
3003 .update(cx, |project, cx| {
3004 project.open_buffer(buffer_path.clone(), cx)
3005 })
3006 .await
3007 .unwrap();
3008
3009 context_store.update(cx, |context_store, cx| {
3010 context_store.add_file_from_buffer(&buffer_path, buffer.clone(), false, cx);
3011 });
3012
3013 Ok(buffer)
3014 }
3015}