1use crate::{
2 AssistantContext, ContextEvent, ContextId, ContextOperation, ContextVersion, SavedContext,
3 SavedContextMetadata,
4};
5use anyhow::{Context as _, Result, anyhow};
6use assistant_slash_command::{SlashCommandId, SlashCommandWorkingSet};
7use client::{Client, TypedEnvelope, proto, telemetry::Telemetry};
8use clock::ReplicaId;
9use collections::HashMap;
10use context_server::ContextServerFactoryRegistry;
11use context_server::manager::ContextServerManager;
12use fs::{Fs, RemoveOptions};
13use futures::StreamExt;
14use fuzzy::StringMatchCandidate;
15use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
16use language::LanguageRegistry;
17use paths::contexts_dir;
18use project::Project;
19use prompt_store::PromptBuilder;
20use regex::Regex;
21use rpc::AnyProtoClient;
22use std::sync::LazyLock;
23use std::{
24 cmp::Reverse,
25 ffi::OsStr,
26 mem,
27 path::{Path, PathBuf},
28 sync::Arc,
29 time::Duration,
30};
31use util::{ResultExt, TryFutureExt};
32
33pub(crate) fn init(client: &AnyProtoClient) {
34 client.add_entity_message_handler(ContextStore::handle_advertise_contexts);
35 client.add_entity_request_handler(ContextStore::handle_open_context);
36 client.add_entity_request_handler(ContextStore::handle_create_context);
37 client.add_entity_message_handler(ContextStore::handle_update_context);
38 client.add_entity_request_handler(ContextStore::handle_synchronize_contexts);
39}
40
41#[derive(Clone)]
42pub struct RemoteContextMetadata {
43 pub id: ContextId,
44 pub summary: Option<String>,
45}
46
47pub struct ContextStore {
48 contexts: Vec<ContextHandle>,
49 contexts_metadata: Vec<SavedContextMetadata>,
50 context_server_manager: Entity<ContextServerManager>,
51 context_server_slash_command_ids: HashMap<Arc<str>, Vec<SlashCommandId>>,
52 host_contexts: Vec<RemoteContextMetadata>,
53 fs: Arc<dyn Fs>,
54 languages: Arc<LanguageRegistry>,
55 slash_commands: Arc<SlashCommandWorkingSet>,
56 telemetry: Arc<Telemetry>,
57 _watch_updates: Task<Option<()>>,
58 client: Arc<Client>,
59 project: Entity<Project>,
60 project_is_shared: bool,
61 client_subscription: Option<client::Subscription>,
62 _project_subscriptions: Vec<gpui::Subscription>,
63 prompt_builder: Arc<PromptBuilder>,
64}
65
66pub enum ContextStoreEvent {
67 ContextCreated(ContextId),
68}
69
70impl EventEmitter<ContextStoreEvent> for ContextStore {}
71
72enum ContextHandle {
73 Weak(WeakEntity<AssistantContext>),
74 Strong(Entity<AssistantContext>),
75}
76
77impl ContextHandle {
78 fn upgrade(&self) -> Option<Entity<AssistantContext>> {
79 match self {
80 ContextHandle::Weak(weak) => weak.upgrade(),
81 ContextHandle::Strong(strong) => Some(strong.clone()),
82 }
83 }
84
85 fn downgrade(&self) -> WeakEntity<AssistantContext> {
86 match self {
87 ContextHandle::Weak(weak) => weak.clone(),
88 ContextHandle::Strong(strong) => strong.downgrade(),
89 }
90 }
91}
92
93impl ContextStore {
94 pub fn new(
95 project: Entity<Project>,
96 prompt_builder: Arc<PromptBuilder>,
97 slash_commands: Arc<SlashCommandWorkingSet>,
98 cx: &mut App,
99 ) -> Task<Result<Entity<Self>>> {
100 let fs = project.read(cx).fs().clone();
101 let languages = project.read(cx).languages().clone();
102 let telemetry = project.read(cx).client().telemetry().clone();
103 cx.spawn(async move |cx| {
104 const CONTEXT_WATCH_DURATION: Duration = Duration::from_millis(100);
105 let (mut events, _) = fs.watch(contexts_dir(), CONTEXT_WATCH_DURATION).await;
106
107 let this = cx.new(|cx: &mut Context<Self>| {
108 let context_server_factory_registry =
109 ContextServerFactoryRegistry::default_global(cx);
110 let context_server_manager = cx.new(|cx| {
111 ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
112 });
113 let mut this = Self {
114 contexts: Vec::new(),
115 contexts_metadata: Vec::new(),
116 context_server_manager,
117 context_server_slash_command_ids: HashMap::default(),
118 host_contexts: Vec::new(),
119 fs,
120 languages,
121 slash_commands,
122 telemetry,
123 _watch_updates: cx.spawn(async move |this, cx| {
124 async move {
125 while events.next().await.is_some() {
126 this.update(cx, |this, cx| this.reload(cx))?.await.log_err();
127 }
128 anyhow::Ok(())
129 }
130 .log_err()
131 .await
132 }),
133 client_subscription: None,
134 _project_subscriptions: vec![
135 cx.subscribe(&project, Self::handle_project_event),
136 ],
137 project_is_shared: false,
138 client: project.read(cx).client(),
139 project: project.clone(),
140 prompt_builder,
141 };
142 this.handle_project_shared(project.clone(), cx);
143 this.synchronize_contexts(cx);
144 this.register_context_server_handlers(cx);
145 this.reload(cx).detach_and_log_err(cx);
146 this
147 })?;
148
149 Ok(this)
150 })
151 }
152
153 async fn handle_advertise_contexts(
154 this: Entity<Self>,
155 envelope: TypedEnvelope<proto::AdvertiseContexts>,
156 mut cx: AsyncApp,
157 ) -> Result<()> {
158 this.update(&mut cx, |this, cx| {
159 this.host_contexts = envelope
160 .payload
161 .contexts
162 .into_iter()
163 .map(|context| RemoteContextMetadata {
164 id: ContextId::from_proto(context.context_id),
165 summary: context.summary,
166 })
167 .collect();
168 cx.notify();
169 })
170 }
171
172 async fn handle_open_context(
173 this: Entity<Self>,
174 envelope: TypedEnvelope<proto::OpenContext>,
175 mut cx: AsyncApp,
176 ) -> Result<proto::OpenContextResponse> {
177 let context_id = ContextId::from_proto(envelope.payload.context_id);
178 let operations = this.update(&mut cx, |this, cx| {
179 if this.project.read(cx).is_via_collab() {
180 return Err(anyhow!("only the host contexts can be opened"));
181 }
182
183 let context = this
184 .loaded_context_for_id(&context_id, cx)
185 .context("context not found")?;
186 if context.read(cx).replica_id() != ReplicaId::default() {
187 return Err(anyhow!("context must be opened via the host"));
188 }
189
190 anyhow::Ok(
191 context
192 .read(cx)
193 .serialize_ops(&ContextVersion::default(), cx),
194 )
195 })??;
196 let operations = operations.await;
197 Ok(proto::OpenContextResponse {
198 context: Some(proto::Context { operations }),
199 })
200 }
201
202 async fn handle_create_context(
203 this: Entity<Self>,
204 _: TypedEnvelope<proto::CreateContext>,
205 mut cx: AsyncApp,
206 ) -> Result<proto::CreateContextResponse> {
207 let (context_id, operations) = this.update(&mut cx, |this, cx| {
208 if this.project.read(cx).is_via_collab() {
209 return Err(anyhow!("can only create contexts as the host"));
210 }
211
212 let context = this.create(cx);
213 let context_id = context.read(cx).id().clone();
214 cx.emit(ContextStoreEvent::ContextCreated(context_id.clone()));
215
216 anyhow::Ok((
217 context_id,
218 context
219 .read(cx)
220 .serialize_ops(&ContextVersion::default(), cx),
221 ))
222 })??;
223 let operations = operations.await;
224 Ok(proto::CreateContextResponse {
225 context_id: context_id.to_proto(),
226 context: Some(proto::Context { operations }),
227 })
228 }
229
230 async fn handle_update_context(
231 this: Entity<Self>,
232 envelope: TypedEnvelope<proto::UpdateContext>,
233 mut cx: AsyncApp,
234 ) -> Result<()> {
235 this.update(&mut cx, |this, cx| {
236 let context_id = ContextId::from_proto(envelope.payload.context_id);
237 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
238 let operation_proto = envelope.payload.operation.context("invalid operation")?;
239 let operation = ContextOperation::from_proto(operation_proto)?;
240 context.update(cx, |context, cx| context.apply_ops([operation], cx));
241 }
242 Ok(())
243 })?
244 }
245
246 async fn handle_synchronize_contexts(
247 this: Entity<Self>,
248 envelope: TypedEnvelope<proto::SynchronizeContexts>,
249 mut cx: AsyncApp,
250 ) -> Result<proto::SynchronizeContextsResponse> {
251 this.update(&mut cx, |this, cx| {
252 if this.project.read(cx).is_via_collab() {
253 return Err(anyhow!("only the host can synchronize contexts"));
254 }
255
256 let mut local_versions = Vec::new();
257 for remote_version_proto in envelope.payload.contexts {
258 let remote_version = ContextVersion::from_proto(&remote_version_proto);
259 let context_id = ContextId::from_proto(remote_version_proto.context_id);
260 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
261 let context = context.read(cx);
262 let operations = context.serialize_ops(&remote_version, cx);
263 local_versions.push(context.version(cx).to_proto(context_id.clone()));
264 let client = this.client.clone();
265 let project_id = envelope.payload.project_id;
266 cx.background_spawn(async move {
267 let operations = operations.await;
268 for operation in operations {
269 client.send(proto::UpdateContext {
270 project_id,
271 context_id: context_id.to_proto(),
272 operation: Some(operation),
273 })?;
274 }
275 anyhow::Ok(())
276 })
277 .detach_and_log_err(cx);
278 }
279 }
280
281 this.advertise_contexts(cx);
282
283 anyhow::Ok(proto::SynchronizeContextsResponse {
284 contexts: local_versions,
285 })
286 })?
287 }
288
289 fn handle_project_shared(&mut self, _: Entity<Project>, cx: &mut Context<Self>) {
290 let is_shared = self.project.read(cx).is_shared();
291 let was_shared = mem::replace(&mut self.project_is_shared, is_shared);
292 if is_shared == was_shared {
293 return;
294 }
295
296 if is_shared {
297 self.contexts.retain_mut(|context| {
298 if let Some(strong_context) = context.upgrade() {
299 *context = ContextHandle::Strong(strong_context);
300 true
301 } else {
302 false
303 }
304 });
305 let remote_id = self.project.read(cx).remote_id().unwrap();
306 self.client_subscription = self
307 .client
308 .subscribe_to_entity(remote_id)
309 .log_err()
310 .map(|subscription| subscription.set_entity(&cx.entity(), &mut cx.to_async()));
311 self.advertise_contexts(cx);
312 } else {
313 self.client_subscription = None;
314 }
315 }
316
317 fn handle_project_event(
318 &mut self,
319 project: Entity<Project>,
320 event: &project::Event,
321 cx: &mut Context<Self>,
322 ) {
323 match event {
324 project::Event::RemoteIdChanged(_) => {
325 self.handle_project_shared(project, cx);
326 }
327 project::Event::Reshared => {
328 self.advertise_contexts(cx);
329 }
330 project::Event::HostReshared | project::Event::Rejoined => {
331 self.synchronize_contexts(cx);
332 }
333 project::Event::DisconnectedFromHost => {
334 self.contexts.retain_mut(|context| {
335 if let Some(strong_context) = context.upgrade() {
336 *context = ContextHandle::Weak(context.downgrade());
337 strong_context.update(cx, |context, cx| {
338 if context.replica_id() != ReplicaId::default() {
339 context.set_capability(language::Capability::ReadOnly, cx);
340 }
341 });
342 true
343 } else {
344 false
345 }
346 });
347 self.host_contexts.clear();
348 cx.notify();
349 }
350 _ => {}
351 }
352 }
353
354 pub fn contexts(&self) -> Vec<SavedContextMetadata> {
355 let mut contexts = self.contexts_metadata.iter().cloned().collect::<Vec<_>>();
356 contexts.sort_unstable_by_key(|thread| std::cmp::Reverse(thread.mtime));
357 contexts
358 }
359
360 pub fn create(&mut self, cx: &mut Context<Self>) -> Entity<AssistantContext> {
361 let context = cx.new(|cx| {
362 AssistantContext::local(
363 self.languages.clone(),
364 Some(self.project.clone()),
365 Some(self.telemetry.clone()),
366 self.prompt_builder.clone(),
367 self.slash_commands.clone(),
368 cx,
369 )
370 });
371 self.register_context(&context, cx);
372 context
373 }
374
375 pub fn create_remote_context(
376 &mut self,
377 cx: &mut Context<Self>,
378 ) -> Task<Result<Entity<AssistantContext>>> {
379 let project = self.project.read(cx);
380 let Some(project_id) = project.remote_id() else {
381 return Task::ready(Err(anyhow!("project was not remote")));
382 };
383
384 let replica_id = project.replica_id();
385 let capability = project.capability();
386 let language_registry = self.languages.clone();
387 let project = self.project.clone();
388 let telemetry = self.telemetry.clone();
389 let prompt_builder = self.prompt_builder.clone();
390 let slash_commands = self.slash_commands.clone();
391 let request = self.client.request(proto::CreateContext { project_id });
392 cx.spawn(async move |this, cx| {
393 let response = request.await?;
394 let context_id = ContextId::from_proto(response.context_id);
395 let context_proto = response.context.context("invalid context")?;
396 let context = cx.new(|cx| {
397 AssistantContext::new(
398 context_id.clone(),
399 replica_id,
400 capability,
401 language_registry,
402 prompt_builder,
403 slash_commands,
404 Some(project),
405 Some(telemetry),
406 cx,
407 )
408 })?;
409 let operations = cx
410 .background_spawn(async move {
411 context_proto
412 .operations
413 .into_iter()
414 .map(ContextOperation::from_proto)
415 .collect::<Result<Vec<_>>>()
416 })
417 .await?;
418 context.update(cx, |context, cx| context.apply_ops(operations, cx))?;
419 this.update(cx, |this, cx| {
420 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
421 existing_context
422 } else {
423 this.register_context(&context, cx);
424 this.synchronize_contexts(cx);
425 context
426 }
427 })
428 })
429 }
430
431 pub fn open_local_context(
432 &mut self,
433 path: PathBuf,
434 cx: &Context<Self>,
435 ) -> Task<Result<Entity<AssistantContext>>> {
436 if let Some(existing_context) = self.loaded_context_for_path(&path, cx) {
437 return Task::ready(Ok(existing_context));
438 }
439
440 let fs = self.fs.clone();
441 let languages = self.languages.clone();
442 let project = self.project.clone();
443 let telemetry = self.telemetry.clone();
444 let load = cx.background_spawn({
445 let path = path.clone();
446 async move {
447 let saved_context = fs.load(&path).await?;
448 SavedContext::from_json(&saved_context)
449 }
450 });
451 let prompt_builder = self.prompt_builder.clone();
452 let slash_commands = self.slash_commands.clone();
453
454 cx.spawn(async move |this, cx| {
455 let saved_context = load.await?;
456 let context = cx.new(|cx| {
457 AssistantContext::deserialize(
458 saved_context,
459 path.clone(),
460 languages,
461 prompt_builder,
462 slash_commands,
463 Some(project),
464 Some(telemetry),
465 cx,
466 )
467 })?;
468 this.update(cx, |this, cx| {
469 if let Some(existing_context) = this.loaded_context_for_path(&path, cx) {
470 existing_context
471 } else {
472 this.register_context(&context, cx);
473 context
474 }
475 })
476 })
477 }
478
479 pub fn delete_local_context(
480 &mut self,
481 path: PathBuf,
482 cx: &mut Context<Self>,
483 ) -> Task<Result<()>> {
484 let fs = self.fs.clone();
485
486 cx.spawn(async move |this, cx| {
487 fs.remove_file(
488 &path,
489 RemoveOptions {
490 recursive: false,
491 ignore_if_not_exists: true,
492 },
493 )
494 .await?;
495
496 this.update(cx, |this, cx| {
497 this.contexts.retain(|context| {
498 context
499 .upgrade()
500 .and_then(|context| context.read(cx).path())
501 != Some(&path)
502 });
503 this.contexts_metadata
504 .retain(|context| context.path != path);
505 })?;
506
507 Ok(())
508 })
509 }
510
511 fn loaded_context_for_path(&self, path: &Path, cx: &App) -> Option<Entity<AssistantContext>> {
512 self.contexts.iter().find_map(|context| {
513 let context = context.upgrade()?;
514 if context.read(cx).path() == Some(path) {
515 Some(context)
516 } else {
517 None
518 }
519 })
520 }
521
522 pub fn loaded_context_for_id(
523 &self,
524 id: &ContextId,
525 cx: &App,
526 ) -> Option<Entity<AssistantContext>> {
527 self.contexts.iter().find_map(|context| {
528 let context = context.upgrade()?;
529 if context.read(cx).id() == id {
530 Some(context)
531 } else {
532 None
533 }
534 })
535 }
536
537 pub fn open_remote_context(
538 &mut self,
539 context_id: ContextId,
540 cx: &mut Context<Self>,
541 ) -> Task<Result<Entity<AssistantContext>>> {
542 let project = self.project.read(cx);
543 let Some(project_id) = project.remote_id() else {
544 return Task::ready(Err(anyhow!("project was not remote")));
545 };
546
547 if let Some(context) = self.loaded_context_for_id(&context_id, cx) {
548 return Task::ready(Ok(context));
549 }
550
551 let replica_id = project.replica_id();
552 let capability = project.capability();
553 let language_registry = self.languages.clone();
554 let project = self.project.clone();
555 let telemetry = self.telemetry.clone();
556 let request = self.client.request(proto::OpenContext {
557 project_id,
558 context_id: context_id.to_proto(),
559 });
560 let prompt_builder = self.prompt_builder.clone();
561 let slash_commands = self.slash_commands.clone();
562 cx.spawn(async move |this, cx| {
563 let response = request.await?;
564 let context_proto = response.context.context("invalid context")?;
565 let context = cx.new(|cx| {
566 AssistantContext::new(
567 context_id.clone(),
568 replica_id,
569 capability,
570 language_registry,
571 prompt_builder,
572 slash_commands,
573 Some(project),
574 Some(telemetry),
575 cx,
576 )
577 })?;
578 let operations = cx
579 .background_spawn(async move {
580 context_proto
581 .operations
582 .into_iter()
583 .map(ContextOperation::from_proto)
584 .collect::<Result<Vec<_>>>()
585 })
586 .await?;
587 context.update(cx, |context, cx| context.apply_ops(operations, cx))?;
588 this.update(cx, |this, cx| {
589 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
590 existing_context
591 } else {
592 this.register_context(&context, cx);
593 this.synchronize_contexts(cx);
594 context
595 }
596 })
597 })
598 }
599
600 fn register_context(&mut self, context: &Entity<AssistantContext>, cx: &mut Context<Self>) {
601 let handle = if self.project_is_shared {
602 ContextHandle::Strong(context.clone())
603 } else {
604 ContextHandle::Weak(context.downgrade())
605 };
606 self.contexts.push(handle);
607 self.advertise_contexts(cx);
608 cx.subscribe(context, Self::handle_context_event).detach();
609 }
610
611 fn handle_context_event(
612 &mut self,
613 context: Entity<AssistantContext>,
614 event: &ContextEvent,
615 cx: &mut Context<Self>,
616 ) {
617 let Some(project_id) = self.project.read(cx).remote_id() else {
618 return;
619 };
620
621 match event {
622 ContextEvent::SummaryChanged => {
623 self.advertise_contexts(cx);
624 }
625 ContextEvent::Operation(operation) => {
626 let context_id = context.read(cx).id().to_proto();
627 let operation = operation.to_proto();
628 self.client
629 .send(proto::UpdateContext {
630 project_id,
631 context_id,
632 operation: Some(operation),
633 })
634 .log_err();
635 }
636 _ => {}
637 }
638 }
639
640 fn advertise_contexts(&self, cx: &App) {
641 let Some(project_id) = self.project.read(cx).remote_id() else {
642 return;
643 };
644
645 // For now, only the host can advertise their open contexts.
646 if self.project.read(cx).is_via_collab() {
647 return;
648 }
649
650 let contexts = self
651 .contexts
652 .iter()
653 .rev()
654 .filter_map(|context| {
655 let context = context.upgrade()?.read(cx);
656 if context.replica_id() == ReplicaId::default() {
657 Some(proto::ContextMetadata {
658 context_id: context.id().to_proto(),
659 summary: context.summary().map(|summary| summary.text.clone()),
660 })
661 } else {
662 None
663 }
664 })
665 .collect();
666 self.client
667 .send(proto::AdvertiseContexts {
668 project_id,
669 contexts,
670 })
671 .ok();
672 }
673
674 fn synchronize_contexts(&mut self, cx: &mut Context<Self>) {
675 let Some(project_id) = self.project.read(cx).remote_id() else {
676 return;
677 };
678
679 let contexts = self
680 .contexts
681 .iter()
682 .filter_map(|context| {
683 let context = context.upgrade()?.read(cx);
684 if context.replica_id() != ReplicaId::default() {
685 Some(context.version(cx).to_proto(context.id().clone()))
686 } else {
687 None
688 }
689 })
690 .collect();
691
692 let client = self.client.clone();
693 let request = self.client.request(proto::SynchronizeContexts {
694 project_id,
695 contexts,
696 });
697 cx.spawn(async move |this, cx| {
698 let response = request.await?;
699
700 let mut context_ids = Vec::new();
701 let mut operations = Vec::new();
702 this.read_with(cx, |this, cx| {
703 for context_version_proto in response.contexts {
704 let context_version = ContextVersion::from_proto(&context_version_proto);
705 let context_id = ContextId::from_proto(context_version_proto.context_id);
706 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
707 context_ids.push(context_id);
708 operations.push(context.read(cx).serialize_ops(&context_version, cx));
709 }
710 }
711 })?;
712
713 let operations = futures::future::join_all(operations).await;
714 for (context_id, operations) in context_ids.into_iter().zip(operations) {
715 for operation in operations {
716 client.send(proto::UpdateContext {
717 project_id,
718 context_id: context_id.to_proto(),
719 operation: Some(operation),
720 })?;
721 }
722 }
723
724 anyhow::Ok(())
725 })
726 .detach_and_log_err(cx);
727 }
728
729 pub fn search(&self, query: String, cx: &App) -> Task<Vec<SavedContextMetadata>> {
730 let metadata = self.contexts_metadata.clone();
731 let executor = cx.background_executor().clone();
732 cx.background_spawn(async move {
733 if query.is_empty() {
734 metadata
735 } else {
736 let candidates = metadata
737 .iter()
738 .enumerate()
739 .map(|(id, metadata)| StringMatchCandidate::new(id, &metadata.title))
740 .collect::<Vec<_>>();
741 let matches = fuzzy::match_strings(
742 &candidates,
743 &query,
744 false,
745 100,
746 &Default::default(),
747 executor,
748 )
749 .await;
750
751 matches
752 .into_iter()
753 .map(|mat| metadata[mat.candidate_id].clone())
754 .collect()
755 }
756 })
757 }
758
759 pub fn host_contexts(&self) -> &[RemoteContextMetadata] {
760 &self.host_contexts
761 }
762
763 fn reload(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
764 let fs = self.fs.clone();
765 cx.spawn(async move |this, cx| {
766 fs.create_dir(contexts_dir()).await?;
767
768 let mut paths = fs.read_dir(contexts_dir()).await?;
769 let mut contexts = Vec::<SavedContextMetadata>::new();
770 while let Some(path) = paths.next().await {
771 let path = path?;
772 if path.extension() != Some(OsStr::new("json")) {
773 continue;
774 }
775
776 static ASSISTANT_CONTEXT_REGEX: LazyLock<Regex> =
777 LazyLock::new(|| Regex::new(r" - \d+.zed.json$").unwrap());
778
779 let metadata = fs.metadata(&path).await?;
780 if let Some((file_name, metadata)) = path
781 .file_name()
782 .and_then(|name| name.to_str())
783 .zip(metadata)
784 {
785 // This is used to filter out contexts saved by the new assistant.
786 if !ASSISTANT_CONTEXT_REGEX.is_match(file_name) {
787 continue;
788 }
789
790 if let Some(title) = ASSISTANT_CONTEXT_REGEX
791 .replace(file_name, "")
792 .lines()
793 .next()
794 {
795 contexts.push(SavedContextMetadata {
796 title: title.to_string(),
797 path,
798 mtime: metadata.mtime.timestamp_for_user().into(),
799 });
800 }
801 }
802 }
803 contexts.sort_unstable_by_key(|context| Reverse(context.mtime));
804
805 this.update(cx, |this, cx| {
806 this.contexts_metadata = contexts;
807 cx.notify();
808 })
809 })
810 }
811
812 pub fn restart_context_servers(&mut self, cx: &mut Context<Self>) {
813 cx.update_entity(
814 &self.context_server_manager,
815 |context_server_manager, cx| {
816 for server in context_server_manager.running_servers() {
817 context_server_manager
818 .restart_server(&server.id(), cx)
819 .detach_and_log_err(cx);
820 }
821 },
822 );
823 }
824
825 fn register_context_server_handlers(&self, cx: &mut Context<Self>) {
826 cx.subscribe(
827 &self.context_server_manager.clone(),
828 Self::handle_context_server_event,
829 )
830 .detach();
831 }
832
833 fn handle_context_server_event(
834 &mut self,
835 context_server_manager: Entity<ContextServerManager>,
836 event: &context_server::manager::Event,
837 cx: &mut Context<Self>,
838 ) {
839 let slash_command_working_set = self.slash_commands.clone();
840 match event {
841 context_server::manager::Event::ServerStarted { server_id } => {
842 if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
843 let context_server_manager = context_server_manager.clone();
844 cx.spawn({
845 let server = server.clone();
846 let server_id = server_id.clone();
847 async move |this, cx| {
848 let Some(protocol) = server.client() else {
849 return;
850 };
851
852 if protocol.capable(context_server::protocol::ServerCapability::Prompts) {
853 if let Some(prompts) = protocol.list_prompts().await.log_err() {
854 let slash_command_ids = prompts
855 .into_iter()
856 .filter(assistant_slash_commands::acceptable_prompt)
857 .map(|prompt| {
858 log::info!(
859 "registering context server command: {:?}",
860 prompt.name
861 );
862 slash_command_working_set.insert(Arc::new(
863 assistant_slash_commands::ContextServerSlashCommand::new(
864 context_server_manager.clone(),
865 &server,
866 prompt,
867 ),
868 ))
869 })
870 .collect::<Vec<_>>();
871
872 this.update( cx, |this, _cx| {
873 this.context_server_slash_command_ids
874 .insert(server_id.clone(), slash_command_ids);
875 })
876 .log_err();
877 }
878 }
879 }
880 })
881 .detach();
882 }
883 }
884 context_server::manager::Event::ServerStopped { server_id } => {
885 if let Some(slash_command_ids) =
886 self.context_server_slash_command_ids.remove(server_id)
887 {
888 slash_command_working_set.remove(&slash_command_ids);
889 }
890 }
891 }
892 }
893}