1use crate::{
2 AssistantContext, ContextEvent, ContextId, ContextOperation, ContextVersion, SavedContext,
3 SavedContextMetadata,
4};
5use anyhow::{anyhow, Context as _, Result};
6use assistant_slash_command::{SlashCommandId, SlashCommandWorkingSet};
7use client::{proto, telemetry::Telemetry, Client, TypedEnvelope};
8use clock::ReplicaId;
9use collections::HashMap;
10use context_server::manager::ContextServerManager;
11use context_server::ContextServerFactoryRegistry;
12use fs::Fs;
13use futures::StreamExt;
14use fuzzy::StringMatchCandidate;
15use gpui::{App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Task, WeakEntity};
16use language::LanguageRegistry;
17use paths::contexts_dir;
18use project::Project;
19use prompt_library::PromptBuilder;
20use regex::Regex;
21use rpc::AnyProtoClient;
22use std::sync::LazyLock;
23use std::{
24 cmp::Reverse,
25 ffi::OsStr,
26 mem,
27 path::{Path, PathBuf},
28 sync::Arc,
29 time::Duration,
30};
31use util::{ResultExt, TryFutureExt};
32
33pub(crate) fn init(client: &AnyProtoClient) {
34 client.add_entity_message_handler(ContextStore::handle_advertise_contexts);
35 client.add_entity_request_handler(ContextStore::handle_open_context);
36 client.add_entity_request_handler(ContextStore::handle_create_context);
37 client.add_entity_message_handler(ContextStore::handle_update_context);
38 client.add_entity_request_handler(ContextStore::handle_synchronize_contexts);
39}
40
41#[derive(Clone)]
42pub struct RemoteContextMetadata {
43 pub id: ContextId,
44 pub summary: Option<String>,
45}
46
47pub struct ContextStore {
48 contexts: Vec<ContextHandle>,
49 contexts_metadata: Vec<SavedContextMetadata>,
50 context_server_manager: Entity<ContextServerManager>,
51 context_server_slash_command_ids: HashMap<Arc<str>, Vec<SlashCommandId>>,
52 host_contexts: Vec<RemoteContextMetadata>,
53 fs: Arc<dyn Fs>,
54 languages: Arc<LanguageRegistry>,
55 slash_commands: Arc<SlashCommandWorkingSet>,
56 telemetry: Arc<Telemetry>,
57 _watch_updates: Task<Option<()>>,
58 client: Arc<Client>,
59 project: Entity<Project>,
60 project_is_shared: bool,
61 client_subscription: Option<client::Subscription>,
62 _project_subscriptions: Vec<gpui::Subscription>,
63 prompt_builder: Arc<PromptBuilder>,
64}
65
66pub enum ContextStoreEvent {
67 ContextCreated(ContextId),
68}
69
70impl EventEmitter<ContextStoreEvent> for ContextStore {}
71
72enum ContextHandle {
73 Weak(WeakEntity<AssistantContext>),
74 Strong(Entity<AssistantContext>),
75}
76
77impl ContextHandle {
78 fn upgrade(&self) -> Option<Entity<AssistantContext>> {
79 match self {
80 ContextHandle::Weak(weak) => weak.upgrade(),
81 ContextHandle::Strong(strong) => Some(strong.clone()),
82 }
83 }
84
85 fn downgrade(&self) -> WeakEntity<AssistantContext> {
86 match self {
87 ContextHandle::Weak(weak) => weak.clone(),
88 ContextHandle::Strong(strong) => strong.downgrade(),
89 }
90 }
91}
92
93impl ContextStore {
94 pub fn new(
95 project: Entity<Project>,
96 prompt_builder: Arc<PromptBuilder>,
97 slash_commands: Arc<SlashCommandWorkingSet>,
98 cx: &mut App,
99 ) -> Task<Result<Entity<Self>>> {
100 let fs = project.read(cx).fs().clone();
101 let languages = project.read(cx).languages().clone();
102 let telemetry = project.read(cx).client().telemetry().clone();
103 cx.spawn(|mut cx| async move {
104 const CONTEXT_WATCH_DURATION: Duration = Duration::from_millis(100);
105 let (mut events, _) = fs.watch(contexts_dir(), CONTEXT_WATCH_DURATION).await;
106
107 let this = cx.new(|cx: &mut Context<Self>| {
108 let context_server_factory_registry =
109 ContextServerFactoryRegistry::default_global(cx);
110 let context_server_manager = cx.new(|cx| {
111 ContextServerManager::new(context_server_factory_registry, project.clone(), cx)
112 });
113 let mut this = Self {
114 contexts: Vec::new(),
115 contexts_metadata: Vec::new(),
116 context_server_manager,
117 context_server_slash_command_ids: HashMap::default(),
118 host_contexts: Vec::new(),
119 fs,
120 languages,
121 slash_commands,
122 telemetry,
123 _watch_updates: cx.spawn(|this, mut cx| {
124 async move {
125 while events.next().await.is_some() {
126 this.update(&mut cx, |this, cx| this.reload(cx))?
127 .await
128 .log_err();
129 }
130 anyhow::Ok(())
131 }
132 .log_err()
133 }),
134 client_subscription: None,
135 _project_subscriptions: vec![
136 cx.observe(&project, Self::handle_project_changed),
137 cx.subscribe(&project, Self::handle_project_event),
138 ],
139 project_is_shared: false,
140 client: project.read(cx).client(),
141 project: project.clone(),
142 prompt_builder,
143 };
144 this.handle_project_changed(project.clone(), cx);
145 this.synchronize_contexts(cx);
146 this.register_context_server_handlers(cx);
147 this.reload(cx).detach_and_log_err(cx);
148 this
149 })?;
150
151 Ok(this)
152 })
153 }
154
155 async fn handle_advertise_contexts(
156 this: Entity<Self>,
157 envelope: TypedEnvelope<proto::AdvertiseContexts>,
158 mut cx: AsyncApp,
159 ) -> Result<()> {
160 this.update(&mut cx, |this, cx| {
161 this.host_contexts = envelope
162 .payload
163 .contexts
164 .into_iter()
165 .map(|context| RemoteContextMetadata {
166 id: ContextId::from_proto(context.context_id),
167 summary: context.summary,
168 })
169 .collect();
170 cx.notify();
171 })
172 }
173
174 async fn handle_open_context(
175 this: Entity<Self>,
176 envelope: TypedEnvelope<proto::OpenContext>,
177 mut cx: AsyncApp,
178 ) -> Result<proto::OpenContextResponse> {
179 let context_id = ContextId::from_proto(envelope.payload.context_id);
180 let operations = this.update(&mut cx, |this, cx| {
181 if this.project.read(cx).is_via_collab() {
182 return Err(anyhow!("only the host contexts can be opened"));
183 }
184
185 let context = this
186 .loaded_context_for_id(&context_id, cx)
187 .context("context not found")?;
188 if context.read(cx).replica_id() != ReplicaId::default() {
189 return Err(anyhow!("context must be opened via the host"));
190 }
191
192 anyhow::Ok(
193 context
194 .read(cx)
195 .serialize_ops(&ContextVersion::default(), cx),
196 )
197 })??;
198 let operations = operations.await;
199 Ok(proto::OpenContextResponse {
200 context: Some(proto::Context { operations }),
201 })
202 }
203
204 async fn handle_create_context(
205 this: Entity<Self>,
206 _: TypedEnvelope<proto::CreateContext>,
207 mut cx: AsyncApp,
208 ) -> Result<proto::CreateContextResponse> {
209 let (context_id, operations) = this.update(&mut cx, |this, cx| {
210 if this.project.read(cx).is_via_collab() {
211 return Err(anyhow!("can only create contexts as the host"));
212 }
213
214 let context = this.create(cx);
215 let context_id = context.read(cx).id().clone();
216 cx.emit(ContextStoreEvent::ContextCreated(context_id.clone()));
217
218 anyhow::Ok((
219 context_id,
220 context
221 .read(cx)
222 .serialize_ops(&ContextVersion::default(), cx),
223 ))
224 })??;
225 let operations = operations.await;
226 Ok(proto::CreateContextResponse {
227 context_id: context_id.to_proto(),
228 context: Some(proto::Context { operations }),
229 })
230 }
231
232 async fn handle_update_context(
233 this: Entity<Self>,
234 envelope: TypedEnvelope<proto::UpdateContext>,
235 mut cx: AsyncApp,
236 ) -> Result<()> {
237 this.update(&mut cx, |this, cx| {
238 let context_id = ContextId::from_proto(envelope.payload.context_id);
239 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
240 let operation_proto = envelope.payload.operation.context("invalid operation")?;
241 let operation = ContextOperation::from_proto(operation_proto)?;
242 context.update(cx, |context, cx| context.apply_ops([operation], cx));
243 }
244 Ok(())
245 })?
246 }
247
248 async fn handle_synchronize_contexts(
249 this: Entity<Self>,
250 envelope: TypedEnvelope<proto::SynchronizeContexts>,
251 mut cx: AsyncApp,
252 ) -> Result<proto::SynchronizeContextsResponse> {
253 this.update(&mut cx, |this, cx| {
254 if this.project.read(cx).is_via_collab() {
255 return Err(anyhow!("only the host can synchronize contexts"));
256 }
257
258 let mut local_versions = Vec::new();
259 for remote_version_proto in envelope.payload.contexts {
260 let remote_version = ContextVersion::from_proto(&remote_version_proto);
261 let context_id = ContextId::from_proto(remote_version_proto.context_id);
262 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
263 let context = context.read(cx);
264 let operations = context.serialize_ops(&remote_version, cx);
265 local_versions.push(context.version(cx).to_proto(context_id.clone()));
266 let client = this.client.clone();
267 let project_id = envelope.payload.project_id;
268 cx.background_spawn(async move {
269 let operations = operations.await;
270 for operation in operations {
271 client.send(proto::UpdateContext {
272 project_id,
273 context_id: context_id.to_proto(),
274 operation: Some(operation),
275 })?;
276 }
277 anyhow::Ok(())
278 })
279 .detach_and_log_err(cx);
280 }
281 }
282
283 this.advertise_contexts(cx);
284
285 anyhow::Ok(proto::SynchronizeContextsResponse {
286 contexts: local_versions,
287 })
288 })?
289 }
290
291 fn handle_project_changed(&mut self, _: Entity<Project>, cx: &mut Context<Self>) {
292 let is_shared = self.project.read(cx).is_shared();
293 let was_shared = mem::replace(&mut self.project_is_shared, is_shared);
294 if is_shared == was_shared {
295 return;
296 }
297
298 if is_shared {
299 self.contexts.retain_mut(|context| {
300 if let Some(strong_context) = context.upgrade() {
301 *context = ContextHandle::Strong(strong_context);
302 true
303 } else {
304 false
305 }
306 });
307 let remote_id = self.project.read(cx).remote_id().unwrap();
308 self.client_subscription = self
309 .client
310 .subscribe_to_entity(remote_id)
311 .log_err()
312 .map(|subscription| subscription.set_entity(&cx.entity(), &mut cx.to_async()));
313 self.advertise_contexts(cx);
314 } else {
315 self.client_subscription = None;
316 }
317 }
318
319 fn handle_project_event(
320 &mut self,
321 _: Entity<Project>,
322 event: &project::Event,
323 cx: &mut Context<Self>,
324 ) {
325 match event {
326 project::Event::Reshared => {
327 self.advertise_contexts(cx);
328 }
329 project::Event::HostReshared | project::Event::Rejoined => {
330 self.synchronize_contexts(cx);
331 }
332 project::Event::DisconnectedFromHost => {
333 self.contexts.retain_mut(|context| {
334 if let Some(strong_context) = context.upgrade() {
335 *context = ContextHandle::Weak(context.downgrade());
336 strong_context.update(cx, |context, cx| {
337 if context.replica_id() != ReplicaId::default() {
338 context.set_capability(language::Capability::ReadOnly, cx);
339 }
340 });
341 true
342 } else {
343 false
344 }
345 });
346 self.host_contexts.clear();
347 cx.notify();
348 }
349 _ => {}
350 }
351 }
352
353 pub fn create(&mut self, cx: &mut Context<Self>) -> Entity<AssistantContext> {
354 let context = cx.new(|cx| {
355 AssistantContext::local(
356 self.languages.clone(),
357 Some(self.project.clone()),
358 Some(self.telemetry.clone()),
359 self.prompt_builder.clone(),
360 self.slash_commands.clone(),
361 cx,
362 )
363 });
364 self.register_context(&context, cx);
365 context
366 }
367
368 pub fn create_remote_context(
369 &mut self,
370 cx: &mut Context<Self>,
371 ) -> Task<Result<Entity<AssistantContext>>> {
372 let project = self.project.read(cx);
373 let Some(project_id) = project.remote_id() else {
374 return Task::ready(Err(anyhow!("project was not remote")));
375 };
376
377 let replica_id = project.replica_id();
378 let capability = project.capability();
379 let language_registry = self.languages.clone();
380 let project = self.project.clone();
381 let telemetry = self.telemetry.clone();
382 let prompt_builder = self.prompt_builder.clone();
383 let slash_commands = self.slash_commands.clone();
384 let request = self.client.request(proto::CreateContext { project_id });
385 cx.spawn(|this, mut cx| async move {
386 let response = request.await?;
387 let context_id = ContextId::from_proto(response.context_id);
388 let context_proto = response.context.context("invalid context")?;
389 let context = cx.new(|cx| {
390 AssistantContext::new(
391 context_id.clone(),
392 replica_id,
393 capability,
394 language_registry,
395 prompt_builder,
396 slash_commands,
397 Some(project),
398 Some(telemetry),
399 cx,
400 )
401 })?;
402 let operations = cx
403 .background_spawn(async move {
404 context_proto
405 .operations
406 .into_iter()
407 .map(ContextOperation::from_proto)
408 .collect::<Result<Vec<_>>>()
409 })
410 .await?;
411 context.update(&mut cx, |context, cx| context.apply_ops(operations, cx))?;
412 this.update(&mut cx, |this, cx| {
413 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
414 existing_context
415 } else {
416 this.register_context(&context, cx);
417 this.synchronize_contexts(cx);
418 context
419 }
420 })
421 })
422 }
423
424 pub fn open_local_context(
425 &mut self,
426 path: PathBuf,
427 cx: &Context<Self>,
428 ) -> Task<Result<Entity<AssistantContext>>> {
429 if let Some(existing_context) = self.loaded_context_for_path(&path, cx) {
430 return Task::ready(Ok(existing_context));
431 }
432
433 let fs = self.fs.clone();
434 let languages = self.languages.clone();
435 let project = self.project.clone();
436 let telemetry = self.telemetry.clone();
437 let load = cx.background_spawn({
438 let path = path.clone();
439 async move {
440 let saved_context = fs.load(&path).await?;
441 SavedContext::from_json(&saved_context)
442 }
443 });
444 let prompt_builder = self.prompt_builder.clone();
445 let slash_commands = self.slash_commands.clone();
446
447 cx.spawn(|this, mut cx| async move {
448 let saved_context = load.await?;
449 let context = cx.new(|cx| {
450 AssistantContext::deserialize(
451 saved_context,
452 path.clone(),
453 languages,
454 prompt_builder,
455 slash_commands,
456 Some(project),
457 Some(telemetry),
458 cx,
459 )
460 })?;
461 this.update(&mut cx, |this, cx| {
462 if let Some(existing_context) = this.loaded_context_for_path(&path, cx) {
463 existing_context
464 } else {
465 this.register_context(&context, cx);
466 context
467 }
468 })
469 })
470 }
471
472 fn loaded_context_for_path(&self, path: &Path, cx: &App) -> Option<Entity<AssistantContext>> {
473 self.contexts.iter().find_map(|context| {
474 let context = context.upgrade()?;
475 if context.read(cx).path() == Some(path) {
476 Some(context)
477 } else {
478 None
479 }
480 })
481 }
482
483 pub fn loaded_context_for_id(
484 &self,
485 id: &ContextId,
486 cx: &App,
487 ) -> Option<Entity<AssistantContext>> {
488 self.contexts.iter().find_map(|context| {
489 let context = context.upgrade()?;
490 if context.read(cx).id() == id {
491 Some(context)
492 } else {
493 None
494 }
495 })
496 }
497
498 pub fn open_remote_context(
499 &mut self,
500 context_id: ContextId,
501 cx: &mut Context<Self>,
502 ) -> Task<Result<Entity<AssistantContext>>> {
503 let project = self.project.read(cx);
504 let Some(project_id) = project.remote_id() else {
505 return Task::ready(Err(anyhow!("project was not remote")));
506 };
507
508 if let Some(context) = self.loaded_context_for_id(&context_id, cx) {
509 return Task::ready(Ok(context));
510 }
511
512 let replica_id = project.replica_id();
513 let capability = project.capability();
514 let language_registry = self.languages.clone();
515 let project = self.project.clone();
516 let telemetry = self.telemetry.clone();
517 let request = self.client.request(proto::OpenContext {
518 project_id,
519 context_id: context_id.to_proto(),
520 });
521 let prompt_builder = self.prompt_builder.clone();
522 let slash_commands = self.slash_commands.clone();
523 cx.spawn(|this, mut cx| async move {
524 let response = request.await?;
525 let context_proto = response.context.context("invalid context")?;
526 let context = cx.new(|cx| {
527 AssistantContext::new(
528 context_id.clone(),
529 replica_id,
530 capability,
531 language_registry,
532 prompt_builder,
533 slash_commands,
534 Some(project),
535 Some(telemetry),
536 cx,
537 )
538 })?;
539 let operations = cx
540 .background_spawn(async move {
541 context_proto
542 .operations
543 .into_iter()
544 .map(ContextOperation::from_proto)
545 .collect::<Result<Vec<_>>>()
546 })
547 .await?;
548 context.update(&mut cx, |context, cx| context.apply_ops(operations, cx))?;
549 this.update(&mut cx, |this, cx| {
550 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
551 existing_context
552 } else {
553 this.register_context(&context, cx);
554 this.synchronize_contexts(cx);
555 context
556 }
557 })
558 })
559 }
560
561 fn register_context(&mut self, context: &Entity<AssistantContext>, cx: &mut Context<Self>) {
562 let handle = if self.project_is_shared {
563 ContextHandle::Strong(context.clone())
564 } else {
565 ContextHandle::Weak(context.downgrade())
566 };
567 self.contexts.push(handle);
568 self.advertise_contexts(cx);
569 cx.subscribe(context, Self::handle_context_event).detach();
570 }
571
572 fn handle_context_event(
573 &mut self,
574 context: Entity<AssistantContext>,
575 event: &ContextEvent,
576 cx: &mut Context<Self>,
577 ) {
578 let Some(project_id) = self.project.read(cx).remote_id() else {
579 return;
580 };
581
582 match event {
583 ContextEvent::SummaryChanged => {
584 self.advertise_contexts(cx);
585 }
586 ContextEvent::Operation(operation) => {
587 let context_id = context.read(cx).id().to_proto();
588 let operation = operation.to_proto();
589 self.client
590 .send(proto::UpdateContext {
591 project_id,
592 context_id,
593 operation: Some(operation),
594 })
595 .log_err();
596 }
597 _ => {}
598 }
599 }
600
601 fn advertise_contexts(&self, cx: &App) {
602 let Some(project_id) = self.project.read(cx).remote_id() else {
603 return;
604 };
605
606 // For now, only the host can advertise their open contexts.
607 if self.project.read(cx).is_via_collab() {
608 return;
609 }
610
611 let contexts = self
612 .contexts
613 .iter()
614 .rev()
615 .filter_map(|context| {
616 let context = context.upgrade()?.read(cx);
617 if context.replica_id() == ReplicaId::default() {
618 Some(proto::ContextMetadata {
619 context_id: context.id().to_proto(),
620 summary: context.summary().map(|summary| summary.text.clone()),
621 })
622 } else {
623 None
624 }
625 })
626 .collect();
627 self.client
628 .send(proto::AdvertiseContexts {
629 project_id,
630 contexts,
631 })
632 .ok();
633 }
634
635 fn synchronize_contexts(&mut self, cx: &mut Context<Self>) {
636 let Some(project_id) = self.project.read(cx).remote_id() else {
637 return;
638 };
639
640 let contexts = self
641 .contexts
642 .iter()
643 .filter_map(|context| {
644 let context = context.upgrade()?.read(cx);
645 if context.replica_id() != ReplicaId::default() {
646 Some(context.version(cx).to_proto(context.id().clone()))
647 } else {
648 None
649 }
650 })
651 .collect();
652
653 let client = self.client.clone();
654 let request = self.client.request(proto::SynchronizeContexts {
655 project_id,
656 contexts,
657 });
658 cx.spawn(|this, cx| async move {
659 let response = request.await?;
660
661 let mut context_ids = Vec::new();
662 let mut operations = Vec::new();
663 this.read_with(&cx, |this, cx| {
664 for context_version_proto in response.contexts {
665 let context_version = ContextVersion::from_proto(&context_version_proto);
666 let context_id = ContextId::from_proto(context_version_proto.context_id);
667 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
668 context_ids.push(context_id);
669 operations.push(context.read(cx).serialize_ops(&context_version, cx));
670 }
671 }
672 })?;
673
674 let operations = futures::future::join_all(operations).await;
675 for (context_id, operations) in context_ids.into_iter().zip(operations) {
676 for operation in operations {
677 client.send(proto::UpdateContext {
678 project_id,
679 context_id: context_id.to_proto(),
680 operation: Some(operation),
681 })?;
682 }
683 }
684
685 anyhow::Ok(())
686 })
687 .detach_and_log_err(cx);
688 }
689
690 pub fn search(&self, query: String, cx: &App) -> Task<Vec<SavedContextMetadata>> {
691 let metadata = self.contexts_metadata.clone();
692 let executor = cx.background_executor().clone();
693 cx.background_spawn(async move {
694 if query.is_empty() {
695 metadata
696 } else {
697 let candidates = metadata
698 .iter()
699 .enumerate()
700 .map(|(id, metadata)| StringMatchCandidate::new(id, &metadata.title))
701 .collect::<Vec<_>>();
702 let matches = fuzzy::match_strings(
703 &candidates,
704 &query,
705 false,
706 100,
707 &Default::default(),
708 executor,
709 )
710 .await;
711
712 matches
713 .into_iter()
714 .map(|mat| metadata[mat.candidate_id].clone())
715 .collect()
716 }
717 })
718 }
719
720 pub fn host_contexts(&self) -> &[RemoteContextMetadata] {
721 &self.host_contexts
722 }
723
724 fn reload(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
725 let fs = self.fs.clone();
726 cx.spawn(|this, mut cx| async move {
727 fs.create_dir(contexts_dir()).await?;
728
729 let mut paths = fs.read_dir(contexts_dir()).await?;
730 let mut contexts = Vec::<SavedContextMetadata>::new();
731 while let Some(path) = paths.next().await {
732 let path = path?;
733 if path.extension() != Some(OsStr::new("json")) {
734 continue;
735 }
736
737 static ASSISTANT_CONTEXT_REGEX: LazyLock<Regex> =
738 LazyLock::new(|| Regex::new(r" - \d+.zed.json$").unwrap());
739
740 let metadata = fs.metadata(&path).await?;
741 if let Some((file_name, metadata)) = path
742 .file_name()
743 .and_then(|name| name.to_str())
744 .zip(metadata)
745 {
746 // This is used to filter out contexts saved by the new assistant.
747 if !ASSISTANT_CONTEXT_REGEX.is_match(file_name) {
748 continue;
749 }
750
751 if let Some(title) = ASSISTANT_CONTEXT_REGEX
752 .replace(file_name, "")
753 .lines()
754 .next()
755 {
756 contexts.push(SavedContextMetadata {
757 title: title.to_string(),
758 path,
759 mtime: metadata.mtime.timestamp_for_user().into(),
760 });
761 }
762 }
763 }
764 contexts.sort_unstable_by_key(|context| Reverse(context.mtime));
765
766 this.update(&mut cx, |this, cx| {
767 this.contexts_metadata = contexts;
768 cx.notify();
769 })
770 })
771 }
772
773 pub fn restart_context_servers(&mut self, cx: &mut Context<Self>) {
774 cx.update_entity(
775 &self.context_server_manager,
776 |context_server_manager, cx| {
777 for server in context_server_manager.servers() {
778 context_server_manager
779 .restart_server(&server.id(), cx)
780 .detach_and_log_err(cx);
781 }
782 },
783 );
784 }
785
786 fn register_context_server_handlers(&self, cx: &mut Context<Self>) {
787 cx.subscribe(
788 &self.context_server_manager.clone(),
789 Self::handle_context_server_event,
790 )
791 .detach();
792 }
793
794 fn handle_context_server_event(
795 &mut self,
796 context_server_manager: Entity<ContextServerManager>,
797 event: &context_server::manager::Event,
798 cx: &mut Context<Self>,
799 ) {
800 let slash_command_working_set = self.slash_commands.clone();
801 match event {
802 context_server::manager::Event::ServerStarted { server_id } => {
803 if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
804 let context_server_manager = context_server_manager.clone();
805 cx.spawn({
806 let server = server.clone();
807 let server_id = server_id.clone();
808 |this, mut cx| async move {
809 let Some(protocol) = server.client() else {
810 return;
811 };
812
813 if protocol.capable(context_server::protocol::ServerCapability::Prompts) {
814 if let Some(prompts) = protocol.list_prompts().await.log_err() {
815 let slash_command_ids = prompts
816 .into_iter()
817 .filter(assistant_slash_commands::acceptable_prompt)
818 .map(|prompt| {
819 log::info!(
820 "registering context server command: {:?}",
821 prompt.name
822 );
823 slash_command_working_set.insert(Arc::new(
824 assistant_slash_commands::ContextServerSlashCommand::new(
825 context_server_manager.clone(),
826 &server,
827 prompt,
828 ),
829 ))
830 })
831 .collect::<Vec<_>>();
832
833 this.update(&mut cx, |this, _cx| {
834 this.context_server_slash_command_ids
835 .insert(server_id.clone(), slash_command_ids);
836 })
837 .log_err();
838 }
839 }
840 }
841 })
842 .detach();
843 }
844 }
845 context_server::manager::Event::ServerStopped { server_id } => {
846 if let Some(slash_command_ids) =
847 self.context_server_slash_command_ids.remove(server_id)
848 {
849 slash_command_working_set.remove(&slash_command_ids);
850 }
851 }
852 }
853 }
854}