1use crate::{
2 AssistantContext, ContextEvent, ContextId, ContextOperation, ContextVersion, SavedContext,
3 SavedContextMetadata,
4};
5use anyhow::{Context as _, Result, anyhow};
6use assistant_slash_command::{SlashCommandId, SlashCommandWorkingSet};
7use client::{Client, TypedEnvelope, proto, telemetry::Telemetry};
8use clock::ReplicaId;
9use collections::HashMap;
10use context_server::ContextServerDescriptorRegistry;
11use context_server::manager::{ContextServerManager, ContextServerStatus};
12use fs::{Fs, RemoveOptions};
13use futures::StreamExt;
14use fuzzy::StringMatchCandidate;
15use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
16use language::LanguageRegistry;
17use paths::contexts_dir;
18use project::Project;
19use prompt_store::PromptBuilder;
20use regex::Regex;
21use rpc::AnyProtoClient;
22use std::sync::LazyLock;
23use std::{cmp::Reverse, ffi::OsStr, mem, path::Path, sync::Arc, time::Duration};
24use util::{ResultExt, TryFutureExt};
25
26pub(crate) fn init(client: &AnyProtoClient) {
27 client.add_entity_message_handler(ContextStore::handle_advertise_contexts);
28 client.add_entity_request_handler(ContextStore::handle_open_context);
29 client.add_entity_request_handler(ContextStore::handle_create_context);
30 client.add_entity_message_handler(ContextStore::handle_update_context);
31 client.add_entity_request_handler(ContextStore::handle_synchronize_contexts);
32}
33
34#[derive(Clone)]
35pub struct RemoteContextMetadata {
36 pub id: ContextId,
37 pub summary: Option<String>,
38}
39
40pub struct ContextStore {
41 contexts: Vec<ContextHandle>,
42 contexts_metadata: Vec<SavedContextMetadata>,
43 context_server_manager: Entity<ContextServerManager>,
44 context_server_slash_command_ids: HashMap<Arc<str>, Vec<SlashCommandId>>,
45 host_contexts: Vec<RemoteContextMetadata>,
46 fs: Arc<dyn Fs>,
47 languages: Arc<LanguageRegistry>,
48 slash_commands: Arc<SlashCommandWorkingSet>,
49 telemetry: Arc<Telemetry>,
50 _watch_updates: Task<Option<()>>,
51 client: Arc<Client>,
52 project: Entity<Project>,
53 project_is_shared: bool,
54 client_subscription: Option<client::Subscription>,
55 _project_subscriptions: Vec<gpui::Subscription>,
56 prompt_builder: Arc<PromptBuilder>,
57}
58
59pub enum ContextStoreEvent {
60 ContextCreated(ContextId),
61}
62
63impl EventEmitter<ContextStoreEvent> for ContextStore {}
64
65enum ContextHandle {
66 Weak(WeakEntity<AssistantContext>),
67 Strong(Entity<AssistantContext>),
68}
69
70impl ContextHandle {
71 fn upgrade(&self) -> Option<Entity<AssistantContext>> {
72 match self {
73 ContextHandle::Weak(weak) => weak.upgrade(),
74 ContextHandle::Strong(strong) => Some(strong.clone()),
75 }
76 }
77
78 fn downgrade(&self) -> WeakEntity<AssistantContext> {
79 match self {
80 ContextHandle::Weak(weak) => weak.clone(),
81 ContextHandle::Strong(strong) => strong.downgrade(),
82 }
83 }
84}
85
86impl ContextStore {
87 pub fn new(
88 project: Entity<Project>,
89 prompt_builder: Arc<PromptBuilder>,
90 slash_commands: Arc<SlashCommandWorkingSet>,
91 cx: &mut App,
92 ) -> Task<Result<Entity<Self>>> {
93 let fs = project.read(cx).fs().clone();
94 let languages = project.read(cx).languages().clone();
95 let telemetry = project.read(cx).client().telemetry().clone();
96 cx.spawn(async move |cx| {
97 const CONTEXT_WATCH_DURATION: Duration = Duration::from_millis(100);
98 let (mut events, _) = fs.watch(contexts_dir(), CONTEXT_WATCH_DURATION).await;
99
100 let this = cx.new(|cx: &mut Context<Self>| {
101 let context_server_factory_registry =
102 ContextServerDescriptorRegistry::default_global(cx);
103 let context_server_manager = cx.new(|cx| {
104 ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
105 });
106 let mut this = Self {
107 contexts: Vec::new(),
108 contexts_metadata: Vec::new(),
109 context_server_manager,
110 context_server_slash_command_ids: HashMap::default(),
111 host_contexts: Vec::new(),
112 fs,
113 languages,
114 slash_commands,
115 telemetry,
116 _watch_updates: cx.spawn(async move |this, cx| {
117 async move {
118 while events.next().await.is_some() {
119 this.update(cx, |this, cx| this.reload(cx))?.await.log_err();
120 }
121 anyhow::Ok(())
122 }
123 .log_err()
124 .await
125 }),
126 client_subscription: None,
127 _project_subscriptions: vec![
128 cx.subscribe(&project, Self::handle_project_event),
129 ],
130 project_is_shared: false,
131 client: project.read(cx).client(),
132 project: project.clone(),
133 prompt_builder,
134 };
135 this.handle_project_shared(project.clone(), cx);
136 this.synchronize_contexts(cx);
137 this.register_context_server_handlers(cx);
138 this.reload(cx).detach_and_log_err(cx);
139 this
140 })?;
141
142 Ok(this)
143 })
144 }
145
146 async fn handle_advertise_contexts(
147 this: Entity<Self>,
148 envelope: TypedEnvelope<proto::AdvertiseContexts>,
149 mut cx: AsyncApp,
150 ) -> Result<()> {
151 this.update(&mut cx, |this, cx| {
152 this.host_contexts = envelope
153 .payload
154 .contexts
155 .into_iter()
156 .map(|context| RemoteContextMetadata {
157 id: ContextId::from_proto(context.context_id),
158 summary: context.summary,
159 })
160 .collect();
161 cx.notify();
162 })
163 }
164
165 async fn handle_open_context(
166 this: Entity<Self>,
167 envelope: TypedEnvelope<proto::OpenContext>,
168 mut cx: AsyncApp,
169 ) -> Result<proto::OpenContextResponse> {
170 let context_id = ContextId::from_proto(envelope.payload.context_id);
171 let operations = this.update(&mut cx, |this, cx| {
172 if this.project.read(cx).is_via_collab() {
173 return Err(anyhow!("only the host contexts can be opened"));
174 }
175
176 let context = this
177 .loaded_context_for_id(&context_id, cx)
178 .context("context not found")?;
179 if context.read(cx).replica_id() != ReplicaId::default() {
180 return Err(anyhow!("context must be opened via the host"));
181 }
182
183 anyhow::Ok(
184 context
185 .read(cx)
186 .serialize_ops(&ContextVersion::default(), cx),
187 )
188 })??;
189 let operations = operations.await;
190 Ok(proto::OpenContextResponse {
191 context: Some(proto::Context { operations }),
192 })
193 }
194
195 async fn handle_create_context(
196 this: Entity<Self>,
197 _: TypedEnvelope<proto::CreateContext>,
198 mut cx: AsyncApp,
199 ) -> Result<proto::CreateContextResponse> {
200 let (context_id, operations) = this.update(&mut cx, |this, cx| {
201 if this.project.read(cx).is_via_collab() {
202 return Err(anyhow!("can only create contexts as the host"));
203 }
204
205 let context = this.create(cx);
206 let context_id = context.read(cx).id().clone();
207 cx.emit(ContextStoreEvent::ContextCreated(context_id.clone()));
208
209 anyhow::Ok((
210 context_id,
211 context
212 .read(cx)
213 .serialize_ops(&ContextVersion::default(), cx),
214 ))
215 })??;
216 let operations = operations.await;
217 Ok(proto::CreateContextResponse {
218 context_id: context_id.to_proto(),
219 context: Some(proto::Context { operations }),
220 })
221 }
222
223 async fn handle_update_context(
224 this: Entity<Self>,
225 envelope: TypedEnvelope<proto::UpdateContext>,
226 mut cx: AsyncApp,
227 ) -> Result<()> {
228 this.update(&mut cx, |this, cx| {
229 let context_id = ContextId::from_proto(envelope.payload.context_id);
230 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
231 let operation_proto = envelope.payload.operation.context("invalid operation")?;
232 let operation = ContextOperation::from_proto(operation_proto)?;
233 context.update(cx, |context, cx| context.apply_ops([operation], cx));
234 }
235 Ok(())
236 })?
237 }
238
239 async fn handle_synchronize_contexts(
240 this: Entity<Self>,
241 envelope: TypedEnvelope<proto::SynchronizeContexts>,
242 mut cx: AsyncApp,
243 ) -> Result<proto::SynchronizeContextsResponse> {
244 this.update(&mut cx, |this, cx| {
245 if this.project.read(cx).is_via_collab() {
246 return Err(anyhow!("only the host can synchronize contexts"));
247 }
248
249 let mut local_versions = Vec::new();
250 for remote_version_proto in envelope.payload.contexts {
251 let remote_version = ContextVersion::from_proto(&remote_version_proto);
252 let context_id = ContextId::from_proto(remote_version_proto.context_id);
253 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
254 let context = context.read(cx);
255 let operations = context.serialize_ops(&remote_version, cx);
256 local_versions.push(context.version(cx).to_proto(context_id.clone()));
257 let client = this.client.clone();
258 let project_id = envelope.payload.project_id;
259 cx.background_spawn(async move {
260 let operations = operations.await;
261 for operation in operations {
262 client.send(proto::UpdateContext {
263 project_id,
264 context_id: context_id.to_proto(),
265 operation: Some(operation),
266 })?;
267 }
268 anyhow::Ok(())
269 })
270 .detach_and_log_err(cx);
271 }
272 }
273
274 this.advertise_contexts(cx);
275
276 anyhow::Ok(proto::SynchronizeContextsResponse {
277 contexts: local_versions,
278 })
279 })?
280 }
281
282 fn handle_project_shared(&mut self, _: Entity<Project>, cx: &mut Context<Self>) {
283 let is_shared = self.project.read(cx).is_shared();
284 let was_shared = mem::replace(&mut self.project_is_shared, is_shared);
285 if is_shared == was_shared {
286 return;
287 }
288
289 if is_shared {
290 self.contexts.retain_mut(|context| {
291 if let Some(strong_context) = context.upgrade() {
292 *context = ContextHandle::Strong(strong_context);
293 true
294 } else {
295 false
296 }
297 });
298 let remote_id = self.project.read(cx).remote_id().unwrap();
299 self.client_subscription = self
300 .client
301 .subscribe_to_entity(remote_id)
302 .log_err()
303 .map(|subscription| subscription.set_entity(&cx.entity(), &mut cx.to_async()));
304 self.advertise_contexts(cx);
305 } else {
306 self.client_subscription = None;
307 }
308 }
309
310 fn handle_project_event(
311 &mut self,
312 project: Entity<Project>,
313 event: &project::Event,
314 cx: &mut Context<Self>,
315 ) {
316 match event {
317 project::Event::RemoteIdChanged(_) => {
318 self.handle_project_shared(project, cx);
319 }
320 project::Event::Reshared => {
321 self.advertise_contexts(cx);
322 }
323 project::Event::HostReshared | project::Event::Rejoined => {
324 self.synchronize_contexts(cx);
325 }
326 project::Event::DisconnectedFromHost => {
327 self.contexts.retain_mut(|context| {
328 if let Some(strong_context) = context.upgrade() {
329 *context = ContextHandle::Weak(context.downgrade());
330 strong_context.update(cx, |context, cx| {
331 if context.replica_id() != ReplicaId::default() {
332 context.set_capability(language::Capability::ReadOnly, cx);
333 }
334 });
335 true
336 } else {
337 false
338 }
339 });
340 self.host_contexts.clear();
341 cx.notify();
342 }
343 _ => {}
344 }
345 }
346
347 pub fn contexts(&self) -> Vec<SavedContextMetadata> {
348 let mut contexts = self.contexts_metadata.iter().cloned().collect::<Vec<_>>();
349 contexts.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.mtime));
350 contexts
351 }
352
353 pub fn create(&mut self, cx: &mut Context<Self>) -> Entity<AssistantContext> {
354 let context = cx.new(|cx| {
355 AssistantContext::local(
356 self.languages.clone(),
357 Some(self.project.clone()),
358 Some(self.telemetry.clone()),
359 self.prompt_builder.clone(),
360 self.slash_commands.clone(),
361 cx,
362 )
363 });
364 self.register_context(&context, cx);
365 context
366 }
367
368 pub fn create_remote_context(
369 &mut self,
370 cx: &mut Context<Self>,
371 ) -> Task<Result<Entity<AssistantContext>>> {
372 let project = self.project.read(cx);
373 let Some(project_id) = project.remote_id() else {
374 return Task::ready(Err(anyhow!("project was not remote")));
375 };
376
377 let replica_id = project.replica_id();
378 let capability = project.capability();
379 let language_registry = self.languages.clone();
380 let project = self.project.clone();
381 let telemetry = self.telemetry.clone();
382 let prompt_builder = self.prompt_builder.clone();
383 let slash_commands = self.slash_commands.clone();
384 let request = self.client.request(proto::CreateContext { project_id });
385 cx.spawn(async move |this, cx| {
386 let response = request.await?;
387 let context_id = ContextId::from_proto(response.context_id);
388 let context_proto = response.context.context("invalid context")?;
389 let context = cx.new(|cx| {
390 AssistantContext::new(
391 context_id.clone(),
392 replica_id,
393 capability,
394 language_registry,
395 prompt_builder,
396 slash_commands,
397 Some(project),
398 Some(telemetry),
399 cx,
400 )
401 })?;
402 let operations = cx
403 .background_spawn(async move {
404 context_proto
405 .operations
406 .into_iter()
407 .map(ContextOperation::from_proto)
408 .collect::<Result<Vec<_>>>()
409 })
410 .await?;
411 context.update(cx, |context, cx| context.apply_ops(operations, cx))?;
412 this.update(cx, |this, cx| {
413 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
414 existing_context
415 } else {
416 this.register_context(&context, cx);
417 this.synchronize_contexts(cx);
418 context
419 }
420 })
421 })
422 }
423
424 pub fn open_local_context(
425 &mut self,
426 path: Arc<Path>,
427 cx: &Context<Self>,
428 ) -> Task<Result<Entity<AssistantContext>>> {
429 if let Some(existing_context) = self.loaded_context_for_path(&path, cx) {
430 return Task::ready(Ok(existing_context));
431 }
432
433 let fs = self.fs.clone();
434 let languages = self.languages.clone();
435 let project = self.project.clone();
436 let telemetry = self.telemetry.clone();
437 let load = cx.background_spawn({
438 let path = path.clone();
439 async move {
440 let saved_context = fs.load(&path).await?;
441 SavedContext::from_json(&saved_context)
442 }
443 });
444 let prompt_builder = self.prompt_builder.clone();
445 let slash_commands = self.slash_commands.clone();
446
447 cx.spawn(async move |this, cx| {
448 let saved_context = load.await?;
449 let context = cx.new(|cx| {
450 AssistantContext::deserialize(
451 saved_context,
452 path.clone(),
453 languages,
454 prompt_builder,
455 slash_commands,
456 Some(project),
457 Some(telemetry),
458 cx,
459 )
460 })?;
461 this.update(cx, |this, cx| {
462 if let Some(existing_context) = this.loaded_context_for_path(&path, cx) {
463 existing_context
464 } else {
465 this.register_context(&context, cx);
466 context
467 }
468 })
469 })
470 }
471
472 pub fn delete_local_context(
473 &mut self,
474 path: Arc<Path>,
475 cx: &mut Context<Self>,
476 ) -> Task<Result<()>> {
477 let fs = self.fs.clone();
478
479 cx.spawn(async move |this, cx| {
480 fs.remove_file(
481 &path,
482 RemoveOptions {
483 recursive: false,
484 ignore_if_not_exists: true,
485 },
486 )
487 .await?;
488
489 this.update(cx, |this, cx| {
490 this.contexts.retain(|context| {
491 context
492 .upgrade()
493 .and_then(|context| context.read(cx).path())
494 != Some(&path)
495 });
496 this.contexts_metadata
497 .retain(|context| context.path.as_ref() != path.as_ref());
498 })?;
499
500 Ok(())
501 })
502 }
503
504 fn loaded_context_for_path(&self, path: &Path, cx: &App) -> Option<Entity<AssistantContext>> {
505 self.contexts.iter().find_map(|context| {
506 let context = context.upgrade()?;
507 if context.read(cx).path().map(Arc::as_ref) == Some(path) {
508 Some(context)
509 } else {
510 None
511 }
512 })
513 }
514
515 pub fn loaded_context_for_id(
516 &self,
517 id: &ContextId,
518 cx: &App,
519 ) -> Option<Entity<AssistantContext>> {
520 self.contexts.iter().find_map(|context| {
521 let context = context.upgrade()?;
522 if context.read(cx).id() == id {
523 Some(context)
524 } else {
525 None
526 }
527 })
528 }
529
530 pub fn open_remote_context(
531 &mut self,
532 context_id: ContextId,
533 cx: &mut Context<Self>,
534 ) -> Task<Result<Entity<AssistantContext>>> {
535 let project = self.project.read(cx);
536 let Some(project_id) = project.remote_id() else {
537 return Task::ready(Err(anyhow!("project was not remote")));
538 };
539
540 if let Some(context) = self.loaded_context_for_id(&context_id, cx) {
541 return Task::ready(Ok(context));
542 }
543
544 let replica_id = project.replica_id();
545 let capability = project.capability();
546 let language_registry = self.languages.clone();
547 let project = self.project.clone();
548 let telemetry = self.telemetry.clone();
549 let request = self.client.request(proto::OpenContext {
550 project_id,
551 context_id: context_id.to_proto(),
552 });
553 let prompt_builder = self.prompt_builder.clone();
554 let slash_commands = self.slash_commands.clone();
555 cx.spawn(async move |this, cx| {
556 let response = request.await?;
557 let context_proto = response.context.context("invalid context")?;
558 let context = cx.new(|cx| {
559 AssistantContext::new(
560 context_id.clone(),
561 replica_id,
562 capability,
563 language_registry,
564 prompt_builder,
565 slash_commands,
566 Some(project),
567 Some(telemetry),
568 cx,
569 )
570 })?;
571 let operations = cx
572 .background_spawn(async move {
573 context_proto
574 .operations
575 .into_iter()
576 .map(ContextOperation::from_proto)
577 .collect::<Result<Vec<_>>>()
578 })
579 .await?;
580 context.update(cx, |context, cx| context.apply_ops(operations, cx))?;
581 this.update(cx, |this, cx| {
582 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
583 existing_context
584 } else {
585 this.register_context(&context, cx);
586 this.synchronize_contexts(cx);
587 context
588 }
589 })
590 })
591 }
592
593 fn register_context(&mut self, context: &Entity<AssistantContext>, cx: &mut Context<Self>) {
594 let handle = if self.project_is_shared {
595 ContextHandle::Strong(context.clone())
596 } else {
597 ContextHandle::Weak(context.downgrade())
598 };
599 self.contexts.push(handle);
600 self.advertise_contexts(cx);
601 cx.subscribe(context, Self::handle_context_event).detach();
602 }
603
604 fn handle_context_event(
605 &mut self,
606 context: Entity<AssistantContext>,
607 event: &ContextEvent,
608 cx: &mut Context<Self>,
609 ) {
610 let Some(project_id) = self.project.read(cx).remote_id() else {
611 return;
612 };
613
614 match event {
615 ContextEvent::SummaryChanged => {
616 self.advertise_contexts(cx);
617 }
618 ContextEvent::Operation(operation) => {
619 let context_id = context.read(cx).id().to_proto();
620 let operation = operation.to_proto();
621 self.client
622 .send(proto::UpdateContext {
623 project_id,
624 context_id,
625 operation: Some(operation),
626 })
627 .log_err();
628 }
629 _ => {}
630 }
631 }
632
633 fn advertise_contexts(&self, cx: &App) {
634 let Some(project_id) = self.project.read(cx).remote_id() else {
635 return;
636 };
637
638 // For now, only the host can advertise their open contexts.
639 if self.project.read(cx).is_via_collab() {
640 return;
641 }
642
643 let contexts = self
644 .contexts
645 .iter()
646 .rev()
647 .filter_map(|context| {
648 let context = context.upgrade()?.read(cx);
649 if context.replica_id() == ReplicaId::default() {
650 Some(proto::ContextMetadata {
651 context_id: context.id().to_proto(),
652 summary: context.summary().map(|summary| summary.text.clone()),
653 })
654 } else {
655 None
656 }
657 })
658 .collect();
659 self.client
660 .send(proto::AdvertiseContexts {
661 project_id,
662 contexts,
663 })
664 .ok();
665 }
666
667 fn synchronize_contexts(&mut self, cx: &mut Context<Self>) {
668 let Some(project_id) = self.project.read(cx).remote_id() else {
669 return;
670 };
671
672 let contexts = self
673 .contexts
674 .iter()
675 .filter_map(|context| {
676 let context = context.upgrade()?.read(cx);
677 if context.replica_id() != ReplicaId::default() {
678 Some(context.version(cx).to_proto(context.id().clone()))
679 } else {
680 None
681 }
682 })
683 .collect();
684
685 let client = self.client.clone();
686 let request = self.client.request(proto::SynchronizeContexts {
687 project_id,
688 contexts,
689 });
690 cx.spawn(async move |this, cx| {
691 let response = request.await?;
692
693 let mut context_ids = Vec::new();
694 let mut operations = Vec::new();
695 this.read_with(cx, |this, cx| {
696 for context_version_proto in response.contexts {
697 let context_version = ContextVersion::from_proto(&context_version_proto);
698 let context_id = ContextId::from_proto(context_version_proto.context_id);
699 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
700 context_ids.push(context_id);
701 operations.push(context.read(cx).serialize_ops(&context_version, cx));
702 }
703 }
704 })?;
705
706 let operations = futures::future::join_all(operations).await;
707 for (context_id, operations) in context_ids.into_iter().zip(operations) {
708 for operation in operations {
709 client.send(proto::UpdateContext {
710 project_id,
711 context_id: context_id.to_proto(),
712 operation: Some(operation),
713 })?;
714 }
715 }
716
717 anyhow::Ok(())
718 })
719 .detach_and_log_err(cx);
720 }
721
722 pub fn search(&self, query: String, cx: &App) -> Task<Vec<SavedContextMetadata>> {
723 let metadata = self.contexts_metadata.clone();
724 let executor = cx.background_executor().clone();
725 cx.background_spawn(async move {
726 if query.is_empty() {
727 metadata
728 } else {
729 let candidates = metadata
730 .iter()
731 .enumerate()
732 .map(|(id, metadata)| StringMatchCandidate::new(id, &metadata.title))
733 .collect::<Vec<_>>();
734 let matches = fuzzy::match_strings(
735 &candidates,
736 &query,
737 false,
738 100,
739 &Default::default(),
740 executor,
741 )
742 .await;
743
744 matches
745 .into_iter()
746 .map(|mat| metadata[mat.candidate_id].clone())
747 .collect()
748 }
749 })
750 }
751
752 pub fn host_contexts(&self) -> &[RemoteContextMetadata] {
753 &self.host_contexts
754 }
755
756 fn reload(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
757 let fs = self.fs.clone();
758 cx.spawn(async move |this, cx| {
759 fs.create_dir(contexts_dir()).await?;
760
761 let mut paths = fs.read_dir(contexts_dir()).await?;
762 let mut contexts = Vec::<SavedContextMetadata>::new();
763 while let Some(path) = paths.next().await {
764 let path = path?;
765 if path.extension() != Some(OsStr::new("json")) {
766 continue;
767 }
768
769 static ASSISTANT_CONTEXT_REGEX: LazyLock<Regex> =
770 LazyLock::new(|| Regex::new(r" - \d+.zed.json$").unwrap());
771
772 let metadata = fs.metadata(&path).await?;
773 if let Some((file_name, metadata)) = path
774 .file_name()
775 .and_then(|name| name.to_str())
776 .zip(metadata)
777 {
778 // This is used to filter out contexts saved by the new assistant.
779 if !ASSISTANT_CONTEXT_REGEX.is_match(file_name) {
780 continue;
781 }
782
783 if let Some(title) = ASSISTANT_CONTEXT_REGEX
784 .replace(file_name, "")
785 .lines()
786 .next()
787 {
788 contexts.push(SavedContextMetadata {
789 title: title.to_string(),
790 path: path.into(),
791 mtime: metadata.mtime.timestamp_for_user().into(),
792 });
793 }
794 }
795 }
796 contexts.sort_unstable_by_key(|context| Reverse(context.mtime));
797
798 this.update(cx, |this, cx| {
799 this.contexts_metadata = contexts;
800 cx.notify();
801 })
802 })
803 }
804
805 pub fn restart_context_servers(&mut self, cx: &mut Context<Self>) {
806 cx.update_entity(
807 &self.context_server_manager,
808 |context_server_manager, cx| {
809 for server in context_server_manager.running_servers() {
810 context_server_manager
811 .restart_server(&server.id(), cx)
812 .detach_and_log_err(cx);
813 }
814 },
815 );
816 }
817
818 fn register_context_server_handlers(&self, cx: &mut Context<Self>) {
819 cx.subscribe(
820 &self.context_server_manager.clone(),
821 Self::handle_context_server_event,
822 )
823 .detach();
824 }
825
826 fn handle_context_server_event(
827 &mut self,
828 context_server_manager: Entity<ContextServerManager>,
829 event: &context_server::manager::Event,
830 cx: &mut Context<Self>,
831 ) {
832 let slash_command_working_set = self.slash_commands.clone();
833 match event {
834 context_server::manager::Event::ServerStatusChanged { server_id, status } => {
835 match status {
836 Some(ContextServerStatus::Running) => {
837 if let Some(server) = context_server_manager.read(cx).get_server(server_id)
838 {
839 let context_server_manager = context_server_manager.clone();
840 cx.spawn({
841 let server = server.clone();
842 let server_id = server_id.clone();
843 async move |this, cx| {
844 let Some(protocol) = server.client() else {
845 return;
846 };
847
848 if protocol.capable(context_server::protocol::ServerCapability::Prompts) {
849 if let Some(prompts) = protocol.list_prompts().await.log_err() {
850 let slash_command_ids = prompts
851 .into_iter()
852 .filter(assistant_slash_commands::acceptable_prompt)
853 .map(|prompt| {
854 log::info!(
855 "registering context server command: {:?}",
856 prompt.name
857 );
858 slash_command_working_set.insert(Arc::new(
859 assistant_slash_commands::ContextServerSlashCommand::new(
860 context_server_manager.clone(),
861 &server,
862 prompt,
863 ),
864 ))
865 })
866 .collect::<Vec<_>>();
867
868 this.update( cx, |this, _cx| {
869 this.context_server_slash_command_ids
870 .insert(server_id.clone(), slash_command_ids);
871 })
872 .log_err();
873 }
874 }
875 }
876 })
877 .detach();
878 }
879 }
880 None => {
881 if let Some(slash_command_ids) =
882 self.context_server_slash_command_ids.remove(server_id)
883 {
884 slash_command_working_set.remove(&slash_command_ids);
885 }
886 }
887 _ => {}
888 }
889 }
890 }
891 }
892}