1use crate::{
2 AssistantContext, ContextEvent, ContextId, ContextOperation, ContextVersion, SavedContext,
3 SavedContextMetadata,
4};
5use anyhow::{anyhow, Context as _, Result};
6use assistant_slash_command::{SlashCommandId, SlashCommandWorkingSet};
7use client::{proto, telemetry::Telemetry, Client, TypedEnvelope};
8use clock::ReplicaId;
9use collections::HashMap;
10use context_server::manager::ContextServerManager;
11use context_server::ContextServerFactoryRegistry;
12use fs::{Fs, RemoveOptions};
13use futures::StreamExt;
14use fuzzy::StringMatchCandidate;
15use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
16use language::LanguageRegistry;
17use paths::contexts_dir;
18use project::Project;
19use prompt_store::PromptBuilder;
20use regex::Regex;
21use rpc::AnyProtoClient;
22use std::sync::LazyLock;
23use std::{
24 cmp::Reverse,
25 ffi::OsStr,
26 mem,
27 path::{Path, PathBuf},
28 sync::Arc,
29 time::Duration,
30};
31use util::{ResultExt, TryFutureExt};
32
33pub(crate) fn init(client: &AnyProtoClient) {
34 client.add_entity_message_handler(ContextStore::handle_advertise_contexts);
35 client.add_entity_request_handler(ContextStore::handle_open_context);
36 client.add_entity_request_handler(ContextStore::handle_create_context);
37 client.add_entity_message_handler(ContextStore::handle_update_context);
38 client.add_entity_request_handler(ContextStore::handle_synchronize_contexts);
39}
40
41#[derive(Clone)]
42pub struct RemoteContextMetadata {
43 pub id: ContextId,
44 pub summary: Option<String>,
45}
46
47pub struct ContextStore {
48 contexts: Vec<ContextHandle>,
49 contexts_metadata: Vec<SavedContextMetadata>,
50 context_server_manager: Entity<ContextServerManager>,
51 context_server_slash_command_ids: HashMap<Arc<str>, Vec<SlashCommandId>>,
52 host_contexts: Vec<RemoteContextMetadata>,
53 fs: Arc<dyn Fs>,
54 languages: Arc<LanguageRegistry>,
55 slash_commands: Arc<SlashCommandWorkingSet>,
56 telemetry: Arc<Telemetry>,
57 _watch_updates: Task<Option<()>>,
58 client: Arc<Client>,
59 project: Entity<Project>,
60 project_is_shared: bool,
61 client_subscription: Option<client::Subscription>,
62 _project_subscriptions: Vec<gpui::Subscription>,
63 prompt_builder: Arc<PromptBuilder>,
64}
65
66pub enum ContextStoreEvent {
67 ContextCreated(ContextId),
68}
69
70impl EventEmitter<ContextStoreEvent> for ContextStore {}
71
72enum ContextHandle {
73 Weak(WeakEntity<AssistantContext>),
74 Strong(Entity<AssistantContext>),
75}
76
77impl ContextHandle {
78 fn upgrade(&self) -> Option<Entity<AssistantContext>> {
79 match self {
80 ContextHandle::Weak(weak) => weak.upgrade(),
81 ContextHandle::Strong(strong) => Some(strong.clone()),
82 }
83 }
84
85 fn downgrade(&self) -> WeakEntity<AssistantContext> {
86 match self {
87 ContextHandle::Weak(weak) => weak.clone(),
88 ContextHandle::Strong(strong) => strong.downgrade(),
89 }
90 }
91}
92
93impl ContextStore {
94 pub fn new(
95 project: Entity<Project>,
96 prompt_builder: Arc<PromptBuilder>,
97 slash_commands: Arc<SlashCommandWorkingSet>,
98 cx: &mut App,
99 ) -> Task<Result<Entity<Self>>> {
100 let fs = project.read(cx).fs().clone();
101 let languages = project.read(cx).languages().clone();
102 let telemetry = project.read(cx).client().telemetry().clone();
103 cx.spawn(|mut cx| async move {
104 const CONTEXT_WATCH_DURATION: Duration = Duration::from_millis(100);
105 let (mut events, _) = fs.watch(contexts_dir(), CONTEXT_WATCH_DURATION).await;
106
107 let this =
108 cx.new(|cx: &mut Context<Self>| {
109 let context_server_factory_registry =
110 ContextServerFactoryRegistry::default_global(cx);
111 let context_server_manager = cx.new(|cx| {
112 ContextServerManager::new(
113 context_server_factory_registry,
114 project.clone(),
115 cx,
116 )
117 });
118 let mut this = Self {
119 contexts: Vec::new(),
120 contexts_metadata: Vec::new(),
121 context_server_manager,
122 context_server_slash_command_ids: HashMap::default(),
123 host_contexts: Vec::new(),
124 fs,
125 languages,
126 slash_commands,
127 telemetry,
128 _watch_updates: cx.spawn(|this, mut cx| {
129 async move {
130 while events.next().await.is_some() {
131 this.update(&mut cx, |this, cx| this.reload(cx))?
132 .await
133 .log_err();
134 }
135 anyhow::Ok(())
136 }
137 .log_err()
138 }),
139 client_subscription: None,
140 _project_subscriptions: vec![
141 cx.subscribe(&project, Self::handle_project_event)
142 ],
143 project_is_shared: false,
144 client: project.read(cx).client(),
145 project: project.clone(),
146 prompt_builder,
147 };
148 this.handle_project_shared(project.clone(), cx);
149 this.synchronize_contexts(cx);
150 this.register_context_server_handlers(cx);
151 this.reload(cx).detach_and_log_err(cx);
152 this
153 })?;
154
155 Ok(this)
156 })
157 }
158
159 async fn handle_advertise_contexts(
160 this: Entity<Self>,
161 envelope: TypedEnvelope<proto::AdvertiseContexts>,
162 mut cx: AsyncApp,
163 ) -> Result<()> {
164 this.update(&mut cx, |this, cx| {
165 this.host_contexts = envelope
166 .payload
167 .contexts
168 .into_iter()
169 .map(|context| RemoteContextMetadata {
170 id: ContextId::from_proto(context.context_id),
171 summary: context.summary,
172 })
173 .collect();
174 cx.notify();
175 })
176 }
177
178 async fn handle_open_context(
179 this: Entity<Self>,
180 envelope: TypedEnvelope<proto::OpenContext>,
181 mut cx: AsyncApp,
182 ) -> Result<proto::OpenContextResponse> {
183 let context_id = ContextId::from_proto(envelope.payload.context_id);
184 let operations = this.update(&mut cx, |this, cx| {
185 if this.project.read(cx).is_via_collab() {
186 return Err(anyhow!("only the host contexts can be opened"));
187 }
188
189 let context = this
190 .loaded_context_for_id(&context_id, cx)
191 .context("context not found")?;
192 if context.read(cx).replica_id() != ReplicaId::default() {
193 return Err(anyhow!("context must be opened via the host"));
194 }
195
196 anyhow::Ok(
197 context
198 .read(cx)
199 .serialize_ops(&ContextVersion::default(), cx),
200 )
201 })??;
202 let operations = operations.await;
203 Ok(proto::OpenContextResponse {
204 context: Some(proto::Context { operations }),
205 })
206 }
207
208 async fn handle_create_context(
209 this: Entity<Self>,
210 _: TypedEnvelope<proto::CreateContext>,
211 mut cx: AsyncApp,
212 ) -> Result<proto::CreateContextResponse> {
213 let (context_id, operations) = this.update(&mut cx, |this, cx| {
214 if this.project.read(cx).is_via_collab() {
215 return Err(anyhow!("can only create contexts as the host"));
216 }
217
218 let context = this.create(cx);
219 let context_id = context.read(cx).id().clone();
220 cx.emit(ContextStoreEvent::ContextCreated(context_id.clone()));
221
222 anyhow::Ok((
223 context_id,
224 context
225 .read(cx)
226 .serialize_ops(&ContextVersion::default(), cx),
227 ))
228 })??;
229 let operations = operations.await;
230 Ok(proto::CreateContextResponse {
231 context_id: context_id.to_proto(),
232 context: Some(proto::Context { operations }),
233 })
234 }
235
236 async fn handle_update_context(
237 this: Entity<Self>,
238 envelope: TypedEnvelope<proto::UpdateContext>,
239 mut cx: AsyncApp,
240 ) -> Result<()> {
241 this.update(&mut cx, |this, cx| {
242 let context_id = ContextId::from_proto(envelope.payload.context_id);
243 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
244 let operation_proto = envelope.payload.operation.context("invalid operation")?;
245 let operation = ContextOperation::from_proto(operation_proto)?;
246 context.update(cx, |context, cx| context.apply_ops([operation], cx));
247 }
248 Ok(())
249 })?
250 }
251
252 async fn handle_synchronize_contexts(
253 this: Entity<Self>,
254 envelope: TypedEnvelope<proto::SynchronizeContexts>,
255 mut cx: AsyncApp,
256 ) -> Result<proto::SynchronizeContextsResponse> {
257 this.update(&mut cx, |this, cx| {
258 if this.project.read(cx).is_via_collab() {
259 return Err(anyhow!("only the host can synchronize contexts"));
260 }
261
262 let mut local_versions = Vec::new();
263 for remote_version_proto in envelope.payload.contexts {
264 let remote_version = ContextVersion::from_proto(&remote_version_proto);
265 let context_id = ContextId::from_proto(remote_version_proto.context_id);
266 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
267 let context = context.read(cx);
268 let operations = context.serialize_ops(&remote_version, cx);
269 local_versions.push(context.version(cx).to_proto(context_id.clone()));
270 let client = this.client.clone();
271 let project_id = envelope.payload.project_id;
272 cx.background_spawn(async move {
273 let operations = operations.await;
274 for operation in operations {
275 client.send(proto::UpdateContext {
276 project_id,
277 context_id: context_id.to_proto(),
278 operation: Some(operation),
279 })?;
280 }
281 anyhow::Ok(())
282 })
283 .detach_and_log_err(cx);
284 }
285 }
286
287 this.advertise_contexts(cx);
288
289 anyhow::Ok(proto::SynchronizeContextsResponse {
290 contexts: local_versions,
291 })
292 })?
293 }
294
295 fn handle_project_shared(&mut self, _: Entity<Project>, cx: &mut Context<Self>) {
296 let is_shared = self.project.read(cx).is_shared();
297 let was_shared = mem::replace(&mut self.project_is_shared, is_shared);
298 if is_shared == was_shared {
299 return;
300 }
301
302 if is_shared {
303 self.contexts.retain_mut(|context| {
304 if let Some(strong_context) = context.upgrade() {
305 *context = ContextHandle::Strong(strong_context);
306 true
307 } else {
308 false
309 }
310 });
311 let remote_id = self.project.read(cx).remote_id().unwrap();
312 self.client_subscription = self
313 .client
314 .subscribe_to_entity(remote_id)
315 .log_err()
316 .map(|subscription| subscription.set_entity(&cx.entity(), &mut cx.to_async()));
317 self.advertise_contexts(cx);
318 } else {
319 self.client_subscription = None;
320 }
321 }
322
323 fn handle_project_event(
324 &mut self,
325 project: Entity<Project>,
326 event: &project::Event,
327 cx: &mut Context<Self>,
328 ) {
329 match event {
330 project::Event::RemoteIdChanged(_) => {
331 self.handle_project_shared(project, cx);
332 }
333 project::Event::Reshared => {
334 self.advertise_contexts(cx);
335 }
336 project::Event::HostReshared | project::Event::Rejoined => {
337 self.synchronize_contexts(cx);
338 }
339 project::Event::DisconnectedFromHost => {
340 self.contexts.retain_mut(|context| {
341 if let Some(strong_context) = context.upgrade() {
342 *context = ContextHandle::Weak(context.downgrade());
343 strong_context.update(cx, |context, cx| {
344 if context.replica_id() != ReplicaId::default() {
345 context.set_capability(language::Capability::ReadOnly, cx);
346 }
347 });
348 true
349 } else {
350 false
351 }
352 });
353 self.host_contexts.clear();
354 cx.notify();
355 }
356 _ => {}
357 }
358 }
359
360 pub fn contexts(&self) -> Vec<SavedContextMetadata> {
361 let mut contexts = self.contexts_metadata.iter().cloned().collect::<Vec<_>>();
362 contexts.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.mtime));
363 contexts
364 }
365
366 pub fn create(&mut self, cx: &mut Context<Self>) -> Entity<AssistantContext> {
367 let context = cx.new(|cx| {
368 AssistantContext::local(
369 self.languages.clone(),
370 Some(self.project.clone()),
371 Some(self.telemetry.clone()),
372 self.prompt_builder.clone(),
373 self.slash_commands.clone(),
374 cx,
375 )
376 });
377 self.register_context(&context, cx);
378 context
379 }
380
381 pub fn create_remote_context(
382 &mut self,
383 cx: &mut Context<Self>,
384 ) -> Task<Result<Entity<AssistantContext>>> {
385 let project = self.project.read(cx);
386 let Some(project_id) = project.remote_id() else {
387 return Task::ready(Err(anyhow!("project was not remote")));
388 };
389
390 let replica_id = project.replica_id();
391 let capability = project.capability();
392 let language_registry = self.languages.clone();
393 let project = self.project.clone();
394 let telemetry = self.telemetry.clone();
395 let prompt_builder = self.prompt_builder.clone();
396 let slash_commands = self.slash_commands.clone();
397 let request = self.client.request(proto::CreateContext { project_id });
398 cx.spawn(|this, mut cx| async move {
399 let response = request.await?;
400 let context_id = ContextId::from_proto(response.context_id);
401 let context_proto = response.context.context("invalid context")?;
402 let context = cx.new(|cx| {
403 AssistantContext::new(
404 context_id.clone(),
405 replica_id,
406 capability,
407 language_registry,
408 prompt_builder,
409 slash_commands,
410 Some(project),
411 Some(telemetry),
412 cx,
413 )
414 })?;
415 let operations = cx
416 .background_spawn(async move {
417 context_proto
418 .operations
419 .into_iter()
420 .map(ContextOperation::from_proto)
421 .collect::<Result<Vec<_>>>()
422 })
423 .await?;
424 context.update(&mut cx, |context, cx| context.apply_ops(operations, cx))?;
425 this.update(&mut cx, |this, cx| {
426 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
427 existing_context
428 } else {
429 this.register_context(&context, cx);
430 this.synchronize_contexts(cx);
431 context
432 }
433 })
434 })
435 }
436
437 pub fn open_local_context(
438 &mut self,
439 path: PathBuf,
440 cx: &Context<Self>,
441 ) -> Task<Result<Entity<AssistantContext>>> {
442 if let Some(existing_context) = self.loaded_context_for_path(&path, cx) {
443 return Task::ready(Ok(existing_context));
444 }
445
446 let fs = self.fs.clone();
447 let languages = self.languages.clone();
448 let project = self.project.clone();
449 let telemetry = self.telemetry.clone();
450 let load = cx.background_spawn({
451 let path = path.clone();
452 async move {
453 let saved_context = fs.load(&path).await?;
454 SavedContext::from_json(&saved_context)
455 }
456 });
457 let prompt_builder = self.prompt_builder.clone();
458 let slash_commands = self.slash_commands.clone();
459
460 cx.spawn(|this, mut cx| async move {
461 let saved_context = load.await?;
462 let context = cx.new(|cx| {
463 AssistantContext::deserialize(
464 saved_context,
465 path.clone(),
466 languages,
467 prompt_builder,
468 slash_commands,
469 Some(project),
470 Some(telemetry),
471 cx,
472 )
473 })?;
474 this.update(&mut cx, |this, cx| {
475 if let Some(existing_context) = this.loaded_context_for_path(&path, cx) {
476 existing_context
477 } else {
478 this.register_context(&context, cx);
479 context
480 }
481 })
482 })
483 }
484
485 pub fn delete_local_context(
486 &mut self,
487 path: PathBuf,
488 cx: &mut Context<Self>,
489 ) -> Task<Result<()>> {
490 let fs = self.fs.clone();
491
492 cx.spawn(|this, mut cx| async move {
493 fs.remove_file(
494 &path,
495 RemoveOptions {
496 recursive: false,
497 ignore_if_not_exists: true,
498 },
499 )
500 .await?;
501
502 this.update(&mut cx, |this, cx| {
503 this.contexts.retain(|context| {
504 context
505 .upgrade()
506 .and_then(|context| context.read(cx).path())
507 != Some(&path)
508 });
509 this.contexts_metadata
510 .retain(|context| context.path != path);
511 })?;
512
513 Ok(())
514 })
515 }
516
517 fn loaded_context_for_path(&self, path: &Path, cx: &App) -> Option<Entity<AssistantContext>> {
518 self.contexts.iter().find_map(|context| {
519 let context = context.upgrade()?;
520 if context.read(cx).path() == Some(path) {
521 Some(context)
522 } else {
523 None
524 }
525 })
526 }
527
528 pub fn loaded_context_for_id(
529 &self,
530 id: &ContextId,
531 cx: &App,
532 ) -> Option<Entity<AssistantContext>> {
533 self.contexts.iter().find_map(|context| {
534 let context = context.upgrade()?;
535 if context.read(cx).id() == id {
536 Some(context)
537 } else {
538 None
539 }
540 })
541 }
542
543 pub fn open_remote_context(
544 &mut self,
545 context_id: ContextId,
546 cx: &mut Context<Self>,
547 ) -> Task<Result<Entity<AssistantContext>>> {
548 let project = self.project.read(cx);
549 let Some(project_id) = project.remote_id() else {
550 return Task::ready(Err(anyhow!("project was not remote")));
551 };
552
553 if let Some(context) = self.loaded_context_for_id(&context_id, cx) {
554 return Task::ready(Ok(context));
555 }
556
557 let replica_id = project.replica_id();
558 let capability = project.capability();
559 let language_registry = self.languages.clone();
560 let project = self.project.clone();
561 let telemetry = self.telemetry.clone();
562 let request = self.client.request(proto::OpenContext {
563 project_id,
564 context_id: context_id.to_proto(),
565 });
566 let prompt_builder = self.prompt_builder.clone();
567 let slash_commands = self.slash_commands.clone();
568 cx.spawn(|this, mut cx| async move {
569 let response = request.await?;
570 let context_proto = response.context.context("invalid context")?;
571 let context = cx.new(|cx| {
572 AssistantContext::new(
573 context_id.clone(),
574 replica_id,
575 capability,
576 language_registry,
577 prompt_builder,
578 slash_commands,
579 Some(project),
580 Some(telemetry),
581 cx,
582 )
583 })?;
584 let operations = cx
585 .background_spawn(async move {
586 context_proto
587 .operations
588 .into_iter()
589 .map(ContextOperation::from_proto)
590 .collect::<Result<Vec<_>>>()
591 })
592 .await?;
593 context.update(&mut cx, |context, cx| context.apply_ops(operations, cx))?;
594 this.update(&mut cx, |this, cx| {
595 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
596 existing_context
597 } else {
598 this.register_context(&context, cx);
599 this.synchronize_contexts(cx);
600 context
601 }
602 })
603 })
604 }
605
606 fn register_context(&mut self, context: &Entity<AssistantContext>, cx: &mut Context<Self>) {
607 let handle = if self.project_is_shared {
608 ContextHandle::Strong(context.clone())
609 } else {
610 ContextHandle::Weak(context.downgrade())
611 };
612 self.contexts.push(handle);
613 self.advertise_contexts(cx);
614 cx.subscribe(context, Self::handle_context_event).detach();
615 }
616
617 fn handle_context_event(
618 &mut self,
619 context: Entity<AssistantContext>,
620 event: &ContextEvent,
621 cx: &mut Context<Self>,
622 ) {
623 let Some(project_id) = self.project.read(cx).remote_id() else {
624 return;
625 };
626
627 match event {
628 ContextEvent::SummaryChanged => {
629 self.advertise_contexts(cx);
630 }
631 ContextEvent::Operation(operation) => {
632 let context_id = context.read(cx).id().to_proto();
633 let operation = operation.to_proto();
634 self.client
635 .send(proto::UpdateContext {
636 project_id,
637 context_id,
638 operation: Some(operation),
639 })
640 .log_err();
641 }
642 _ => {}
643 }
644 }
645
646 fn advertise_contexts(&self, cx: &App) {
647 let Some(project_id) = self.project.read(cx).remote_id() else {
648 return;
649 };
650
651 // For now, only the host can advertise their open contexts.
652 if self.project.read(cx).is_via_collab() {
653 return;
654 }
655
656 let contexts = self
657 .contexts
658 .iter()
659 .rev()
660 .filter_map(|context| {
661 let context = context.upgrade()?.read(cx);
662 if context.replica_id() == ReplicaId::default() {
663 Some(proto::ContextMetadata {
664 context_id: context.id().to_proto(),
665 summary: context.summary().map(|summary| summary.text.clone()),
666 })
667 } else {
668 None
669 }
670 })
671 .collect();
672 self.client
673 .send(proto::AdvertiseContexts {
674 project_id,
675 contexts,
676 })
677 .ok();
678 }
679
680 fn synchronize_contexts(&mut self, cx: &mut Context<Self>) {
681 let Some(project_id) = self.project.read(cx).remote_id() else {
682 return;
683 };
684
685 let contexts = self
686 .contexts
687 .iter()
688 .filter_map(|context| {
689 let context = context.upgrade()?.read(cx);
690 if context.replica_id() != ReplicaId::default() {
691 Some(context.version(cx).to_proto(context.id().clone()))
692 } else {
693 None
694 }
695 })
696 .collect();
697
698 let client = self.client.clone();
699 let request = self.client.request(proto::SynchronizeContexts {
700 project_id,
701 contexts,
702 });
703 cx.spawn(|this, cx| async move {
704 let response = request.await?;
705
706 let mut context_ids = Vec::new();
707 let mut operations = Vec::new();
708 this.read_with(&cx, |this, cx| {
709 for context_version_proto in response.contexts {
710 let context_version = ContextVersion::from_proto(&context_version_proto);
711 let context_id = ContextId::from_proto(context_version_proto.context_id);
712 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
713 context_ids.push(context_id);
714 operations.push(context.read(cx).serialize_ops(&context_version, cx));
715 }
716 }
717 })?;
718
719 let operations = futures::future::join_all(operations).await;
720 for (context_id, operations) in context_ids.into_iter().zip(operations) {
721 for operation in operations {
722 client.send(proto::UpdateContext {
723 project_id,
724 context_id: context_id.to_proto(),
725 operation: Some(operation),
726 })?;
727 }
728 }
729
730 anyhow::Ok(())
731 })
732 .detach_and_log_err(cx);
733 }
734
735 pub fn search(&self, query: String, cx: &App) -> Task<Vec<SavedContextMetadata>> {
736 let metadata = self.contexts_metadata.clone();
737 let executor = cx.background_executor().clone();
738 cx.background_spawn(async move {
739 if query.is_empty() {
740 metadata
741 } else {
742 let candidates = metadata
743 .iter()
744 .enumerate()
745 .map(|(id, metadata)| StringMatchCandidate::new(id, &metadata.title))
746 .collect::<Vec<_>>();
747 let matches = fuzzy::match_strings(
748 &candidates,
749 &query,
750 false,
751 100,
752 &Default::default(),
753 executor,
754 )
755 .await;
756
757 matches
758 .into_iter()
759 .map(|mat| metadata[mat.candidate_id].clone())
760 .collect()
761 }
762 })
763 }
764
765 pub fn host_contexts(&self) -> &[RemoteContextMetadata] {
766 &self.host_contexts
767 }
768
769 fn reload(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
770 let fs = self.fs.clone();
771 cx.spawn(|this, mut cx| async move {
772 fs.create_dir(contexts_dir()).await?;
773
774 let mut paths = fs.read_dir(contexts_dir()).await?;
775 let mut contexts = Vec::<SavedContextMetadata>::new();
776 while let Some(path) = paths.next().await {
777 let path = path?;
778 if path.extension() != Some(OsStr::new("json")) {
779 continue;
780 }
781
782 static ASSISTANT_CONTEXT_REGEX: LazyLock<Regex> =
783 LazyLock::new(|| Regex::new(r" - \d+.zed.json$").unwrap());
784
785 let metadata = fs.metadata(&path).await?;
786 if let Some((file_name, metadata)) = path
787 .file_name()
788 .and_then(|name| name.to_str())
789 .zip(metadata)
790 {
791 // This is used to filter out contexts saved by the new assistant.
792 if !ASSISTANT_CONTEXT_REGEX.is_match(file_name) {
793 continue;
794 }
795
796 if let Some(title) = ASSISTANT_CONTEXT_REGEX
797 .replace(file_name, "")
798 .lines()
799 .next()
800 {
801 contexts.push(SavedContextMetadata {
802 title: title.to_string(),
803 path,
804 mtime: metadata.mtime.timestamp_for_user().into(),
805 });
806 }
807 }
808 }
809 contexts.sort_unstable_by_key(|context| Reverse(context.mtime));
810
811 this.update(&mut cx, |this, cx| {
812 this.contexts_metadata = contexts;
813 cx.notify();
814 })
815 })
816 }
817
818 pub fn restart_context_servers(&mut self, cx: &mut Context<Self>) {
819 cx.update_entity(
820 &self.context_server_manager,
821 |context_server_manager, cx| {
822 for server in context_server_manager.servers() {
823 context_server_manager
824 .restart_server(&server.id(), cx)
825 .detach_and_log_err(cx);
826 }
827 },
828 );
829 }
830
831 fn register_context_server_handlers(&self, cx: &mut Context<Self>) {
832 cx.subscribe(
833 &self.context_server_manager.clone(),
834 Self::handle_context_server_event,
835 )
836 .detach();
837 }
838
839 fn handle_context_server_event(
840 &mut self,
841 context_server_manager: Entity<ContextServerManager>,
842 event: &context_server::manager::Event,
843 cx: &mut Context<Self>,
844 ) {
845 let slash_command_working_set = self.slash_commands.clone();
846 match event {
847 context_server::manager::Event::ServerStarted { server_id } => {
848 if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
849 let context_server_manager = context_server_manager.clone();
850 cx.spawn({
851 let server = server.clone();
852 let server_id = server_id.clone();
853 |this, mut cx| async move {
854 let Some(protocol) = server.client() else {
855 return;
856 };
857
858 if protocol.capable(context_server::protocol::ServerCapability::Prompts) {
859 if let Some(prompts) = protocol.list_prompts().await.log_err() {
860 let slash_command_ids = prompts
861 .into_iter()
862 .filter(assistant_slash_commands::acceptable_prompt)
863 .map(|prompt| {
864 log::info!(
865 "registering context server command: {:?}",
866 prompt.name
867 );
868 slash_command_working_set.insert(Arc::new(
869 assistant_slash_commands::ContextServerSlashCommand::new(
870 context_server_manager.clone(),
871 &server,
872 prompt,
873 ),
874 ))
875 })
876 .collect::<Vec<_>>();
877
878 this.update(&mut cx, |this, _cx| {
879 this.context_server_slash_command_ids
880 .insert(server_id.clone(), slash_command_ids);
881 })
882 .log_err();
883 }
884 }
885 }
886 })
887 .detach();
888 }
889 }
890 context_server::manager::Event::ServerStopped { server_id } => {
891 if let Some(slash_command_ids) =
892 self.context_server_slash_command_ids.remove(server_id)
893 {
894 slash_command_working_set.remove(&slash_command_ids);
895 }
896 }
897 }
898 }
899}