1use crate::slash_command::context_server_command;
2use crate::{
3 prompts::PromptBuilder, slash_command_working_set::SlashCommandWorkingSet, Context,
4 ContextEvent, ContextId, ContextOperation, ContextVersion, SavedContext, SavedContextMetadata,
5};
6use crate::{tools, SlashCommandId, ToolId, ToolWorkingSet};
7use anyhow::{anyhow, Context as _, Result};
8use client::{proto, telemetry::Telemetry, Client, TypedEnvelope};
9use clock::ReplicaId;
10use collections::HashMap;
11use command_palette_hooks::CommandPaletteFilter;
12use context_servers::manager::{ContextServerManager, ContextServerSettings};
13use context_servers::{ContextServerFactoryRegistry, CONTEXT_SERVERS_NAMESPACE};
14use fs::Fs;
15use futures::StreamExt;
16use fuzzy::StringMatchCandidate;
17use gpui::{
18 AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
19};
20use language::LanguageRegistry;
21use paths::contexts_dir;
22use project::Project;
23use regex::Regex;
24use rpc::AnyProtoClient;
25use settings::{Settings as _, SettingsStore};
26use std::{
27 cmp::Reverse,
28 ffi::OsStr,
29 mem,
30 path::{Path, PathBuf},
31 sync::Arc,
32 time::Duration,
33};
34use util::{ResultExt, TryFutureExt};
35
36pub fn init(client: &AnyProtoClient) {
37 client.add_model_message_handler(ContextStore::handle_advertise_contexts);
38 client.add_model_request_handler(ContextStore::handle_open_context);
39 client.add_model_request_handler(ContextStore::handle_create_context);
40 client.add_model_message_handler(ContextStore::handle_update_context);
41 client.add_model_request_handler(ContextStore::handle_synchronize_contexts);
42}
43
44#[derive(Clone)]
45pub struct RemoteContextMetadata {
46 pub id: ContextId,
47 pub summary: Option<String>,
48}
49
50pub struct ContextStore {
51 contexts: Vec<ContextHandle>,
52 contexts_metadata: Vec<SavedContextMetadata>,
53 context_server_manager: Model<ContextServerManager>,
54 context_server_slash_command_ids: HashMap<Arc<str>, Vec<SlashCommandId>>,
55 context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>,
56 host_contexts: Vec<RemoteContextMetadata>,
57 fs: Arc<dyn Fs>,
58 languages: Arc<LanguageRegistry>,
59 slash_commands: Arc<SlashCommandWorkingSet>,
60 tools: Arc<ToolWorkingSet>,
61 telemetry: Arc<Telemetry>,
62 _watch_updates: Task<Option<()>>,
63 client: Arc<Client>,
64 project: Model<Project>,
65 project_is_shared: bool,
66 client_subscription: Option<client::Subscription>,
67 _project_subscriptions: Vec<gpui::Subscription>,
68 prompt_builder: Arc<PromptBuilder>,
69}
70
71pub enum ContextStoreEvent {
72 ContextCreated(ContextId),
73}
74
75impl EventEmitter<ContextStoreEvent> for ContextStore {}
76
77enum ContextHandle {
78 Weak(WeakModel<Context>),
79 Strong(Model<Context>),
80}
81
82impl ContextHandle {
83 fn upgrade(&self) -> Option<Model<Context>> {
84 match self {
85 ContextHandle::Weak(weak) => weak.upgrade(),
86 ContextHandle::Strong(strong) => Some(strong.clone()),
87 }
88 }
89
90 fn downgrade(&self) -> WeakModel<Context> {
91 match self {
92 ContextHandle::Weak(weak) => weak.clone(),
93 ContextHandle::Strong(strong) => strong.downgrade(),
94 }
95 }
96}
97
98impl ContextStore {
99 pub fn new(
100 project: Model<Project>,
101 prompt_builder: Arc<PromptBuilder>,
102 slash_commands: Arc<SlashCommandWorkingSet>,
103 tools: Arc<ToolWorkingSet>,
104 cx: &mut AppContext,
105 ) -> Task<Result<Model<Self>>> {
106 let fs = project.read(cx).fs().clone();
107 let languages = project.read(cx).languages().clone();
108 let telemetry = project.read(cx).client().telemetry().clone();
109 cx.spawn(|mut cx| async move {
110 const CONTEXT_WATCH_DURATION: Duration = Duration::from_millis(100);
111 let (mut events, _) = fs.watch(contexts_dir(), CONTEXT_WATCH_DURATION).await;
112
113 let this = cx.new_model(|cx: &mut ModelContext<Self>| {
114 let context_server_manager = cx.new_model(|_cx| ContextServerManager::new());
115 let mut this = Self {
116 contexts: Vec::new(),
117 contexts_metadata: Vec::new(),
118 context_server_manager,
119 context_server_slash_command_ids: HashMap::default(),
120 context_server_tool_ids: HashMap::default(),
121 host_contexts: Vec::new(),
122 fs,
123 languages,
124 slash_commands,
125 tools,
126 telemetry,
127 _watch_updates: cx.spawn(|this, mut cx| {
128 async move {
129 while events.next().await.is_some() {
130 this.update(&mut cx, |this, cx| this.reload(cx))?
131 .await
132 .log_err();
133 }
134 anyhow::Ok(())
135 }
136 .log_err()
137 }),
138 client_subscription: None,
139 _project_subscriptions: vec![
140 cx.observe(&project, Self::handle_project_changed),
141 cx.subscribe(&project, Self::handle_project_event),
142 ],
143 project_is_shared: false,
144 client: project.read(cx).client(),
145 project: project.clone(),
146 prompt_builder,
147 };
148 this.handle_project_changed(project, cx);
149 this.synchronize_contexts(cx);
150 this.register_context_server_handlers(cx);
151
152 // TODO: At the time when we construct the `ContextStore` we may not have yet initialized the extensions.
153 // In order to register the context servers when the extension is loaded, we're periodically looping to
154 // see if there are context servers to register.
155 //
156 // I tried doing this in a subscription on the `ExtensionStore`, but it never seemed to fire.
157 //
158 // We should find a more elegant way to do this.
159 let context_server_factory_registry =
160 ContextServerFactoryRegistry::default_global(cx);
161 cx.spawn(|context_store, mut cx| async move {
162 loop {
163 let mut servers_to_register = Vec::new();
164 for (_id, factory) in
165 context_server_factory_registry.context_server_factories()
166 {
167 if let Some(server) = factory(&cx).await.log_err() {
168 servers_to_register.push(server);
169 }
170 }
171
172 let Some(_) = context_store
173 .update(&mut cx, |this, cx| {
174 this.context_server_manager.update(cx, |this, cx| {
175 for server in servers_to_register {
176 this.add_server(server, cx).detach_and_log_err(cx);
177 }
178 })
179 })
180 .log_err()
181 else {
182 break;
183 };
184
185 smol::Timer::after(Duration::from_millis(100)).await;
186 }
187
188 anyhow::Ok(())
189 })
190 .detach_and_log_err(cx);
191
192 this
193 })?;
194 this.update(&mut cx, |this, cx| this.reload(cx))?
195 .await
196 .log_err();
197
198 this.update(&mut cx, |this, cx| {
199 this.watch_context_server_settings(cx);
200 })
201 .log_err();
202
203 Ok(this)
204 })
205 }
206
207 fn watch_context_server_settings(&self, cx: &mut ModelContext<Self>) {
208 cx.observe_global::<SettingsStore>(move |this, cx| {
209 this.context_server_manager.update(cx, |manager, cx| {
210 let location = this.project.read(cx).worktrees(cx).next().map(|worktree| {
211 settings::SettingsLocation {
212 worktree_id: worktree.read(cx).id(),
213 path: Path::new(""),
214 }
215 });
216 let settings = ContextServerSettings::get(location, cx);
217
218 manager.maintain_servers(settings, cx);
219
220 let has_any_context_servers = !manager.servers().is_empty();
221 CommandPaletteFilter::update_global(cx, |filter, _cx| {
222 if has_any_context_servers {
223 filter.show_namespace(CONTEXT_SERVERS_NAMESPACE);
224 } else {
225 filter.hide_namespace(CONTEXT_SERVERS_NAMESPACE);
226 }
227 });
228 })
229 })
230 .detach();
231 }
232
233 async fn handle_advertise_contexts(
234 this: Model<Self>,
235 envelope: TypedEnvelope<proto::AdvertiseContexts>,
236 mut cx: AsyncAppContext,
237 ) -> Result<()> {
238 this.update(&mut cx, |this, cx| {
239 this.host_contexts = envelope
240 .payload
241 .contexts
242 .into_iter()
243 .map(|context| RemoteContextMetadata {
244 id: ContextId::from_proto(context.context_id),
245 summary: context.summary,
246 })
247 .collect();
248 cx.notify();
249 })
250 }
251
252 async fn handle_open_context(
253 this: Model<Self>,
254 envelope: TypedEnvelope<proto::OpenContext>,
255 mut cx: AsyncAppContext,
256 ) -> Result<proto::OpenContextResponse> {
257 let context_id = ContextId::from_proto(envelope.payload.context_id);
258 let operations = this.update(&mut cx, |this, cx| {
259 if this.project.read(cx).is_via_collab() {
260 return Err(anyhow!("only the host contexts can be opened"));
261 }
262
263 let context = this
264 .loaded_context_for_id(&context_id, cx)
265 .context("context not found")?;
266 if context.read(cx).replica_id() != ReplicaId::default() {
267 return Err(anyhow!("context must be opened via the host"));
268 }
269
270 anyhow::Ok(
271 context
272 .read(cx)
273 .serialize_ops(&ContextVersion::default(), cx),
274 )
275 })??;
276 let operations = operations.await;
277 Ok(proto::OpenContextResponse {
278 context: Some(proto::Context { operations }),
279 })
280 }
281
282 async fn handle_create_context(
283 this: Model<Self>,
284 _: TypedEnvelope<proto::CreateContext>,
285 mut cx: AsyncAppContext,
286 ) -> Result<proto::CreateContextResponse> {
287 let (context_id, operations) = this.update(&mut cx, |this, cx| {
288 if this.project.read(cx).is_via_collab() {
289 return Err(anyhow!("can only create contexts as the host"));
290 }
291
292 let context = this.create(cx);
293 let context_id = context.read(cx).id().clone();
294 cx.emit(ContextStoreEvent::ContextCreated(context_id.clone()));
295
296 anyhow::Ok((
297 context_id,
298 context
299 .read(cx)
300 .serialize_ops(&ContextVersion::default(), cx),
301 ))
302 })??;
303 let operations = operations.await;
304 Ok(proto::CreateContextResponse {
305 context_id: context_id.to_proto(),
306 context: Some(proto::Context { operations }),
307 })
308 }
309
310 async fn handle_update_context(
311 this: Model<Self>,
312 envelope: TypedEnvelope<proto::UpdateContext>,
313 mut cx: AsyncAppContext,
314 ) -> Result<()> {
315 this.update(&mut cx, |this, cx| {
316 let context_id = ContextId::from_proto(envelope.payload.context_id);
317 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
318 let operation_proto = envelope.payload.operation.context("invalid operation")?;
319 let operation = ContextOperation::from_proto(operation_proto)?;
320 context.update(cx, |context, cx| context.apply_ops([operation], cx));
321 }
322 Ok(())
323 })?
324 }
325
326 async fn handle_synchronize_contexts(
327 this: Model<Self>,
328 envelope: TypedEnvelope<proto::SynchronizeContexts>,
329 mut cx: AsyncAppContext,
330 ) -> Result<proto::SynchronizeContextsResponse> {
331 this.update(&mut cx, |this, cx| {
332 if this.project.read(cx).is_via_collab() {
333 return Err(anyhow!("only the host can synchronize contexts"));
334 }
335
336 let mut local_versions = Vec::new();
337 for remote_version_proto in envelope.payload.contexts {
338 let remote_version = ContextVersion::from_proto(&remote_version_proto);
339 let context_id = ContextId::from_proto(remote_version_proto.context_id);
340 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
341 let context = context.read(cx);
342 let operations = context.serialize_ops(&remote_version, cx);
343 local_versions.push(context.version(cx).to_proto(context_id.clone()));
344 let client = this.client.clone();
345 let project_id = envelope.payload.project_id;
346 cx.background_executor()
347 .spawn(async move {
348 let operations = operations.await;
349 for operation in operations {
350 client.send(proto::UpdateContext {
351 project_id,
352 context_id: context_id.to_proto(),
353 operation: Some(operation),
354 })?;
355 }
356 anyhow::Ok(())
357 })
358 .detach_and_log_err(cx);
359 }
360 }
361
362 this.advertise_contexts(cx);
363
364 anyhow::Ok(proto::SynchronizeContextsResponse {
365 contexts: local_versions,
366 })
367 })?
368 }
369
370 fn handle_project_changed(&mut self, _: Model<Project>, cx: &mut ModelContext<Self>) {
371 let is_shared = self.project.read(cx).is_shared();
372 let was_shared = mem::replace(&mut self.project_is_shared, is_shared);
373 if is_shared == was_shared {
374 return;
375 }
376
377 if is_shared {
378 self.contexts.retain_mut(|context| {
379 if let Some(strong_context) = context.upgrade() {
380 *context = ContextHandle::Strong(strong_context);
381 true
382 } else {
383 false
384 }
385 });
386 let remote_id = self.project.read(cx).remote_id().unwrap();
387 self.client_subscription = self
388 .client
389 .subscribe_to_entity(remote_id)
390 .log_err()
391 .map(|subscription| subscription.set_model(&cx.handle(), &mut cx.to_async()));
392 self.advertise_contexts(cx);
393 } else {
394 self.client_subscription = None;
395 }
396 }
397
398 fn handle_project_event(
399 &mut self,
400 _: Model<Project>,
401 event: &project::Event,
402 cx: &mut ModelContext<Self>,
403 ) {
404 match event {
405 project::Event::Reshared => {
406 self.advertise_contexts(cx);
407 }
408 project::Event::HostReshared | project::Event::Rejoined => {
409 self.synchronize_contexts(cx);
410 }
411 project::Event::DisconnectedFromHost => {
412 self.contexts.retain_mut(|context| {
413 if let Some(strong_context) = context.upgrade() {
414 *context = ContextHandle::Weak(context.downgrade());
415 strong_context.update(cx, |context, cx| {
416 if context.replica_id() != ReplicaId::default() {
417 context.set_capability(language::Capability::ReadOnly, cx);
418 }
419 });
420 true
421 } else {
422 false
423 }
424 });
425 self.host_contexts.clear();
426 cx.notify();
427 }
428 _ => {}
429 }
430 }
431
432 pub fn create(&mut self, cx: &mut ModelContext<Self>) -> Model<Context> {
433 let context = cx.new_model(|cx| {
434 Context::local(
435 self.languages.clone(),
436 Some(self.project.clone()),
437 Some(self.telemetry.clone()),
438 self.prompt_builder.clone(),
439 self.slash_commands.clone(),
440 self.tools.clone(),
441 cx,
442 )
443 });
444 self.register_context(&context, cx);
445 context
446 }
447
448 pub fn create_remote_context(
449 &mut self,
450 cx: &mut ModelContext<Self>,
451 ) -> Task<Result<Model<Context>>> {
452 let project = self.project.read(cx);
453 let Some(project_id) = project.remote_id() else {
454 return Task::ready(Err(anyhow!("project was not remote")));
455 };
456
457 let replica_id = project.replica_id();
458 let capability = project.capability();
459 let language_registry = self.languages.clone();
460 let project = self.project.clone();
461 let telemetry = self.telemetry.clone();
462 let prompt_builder = self.prompt_builder.clone();
463 let slash_commands = self.slash_commands.clone();
464 let tools = self.tools.clone();
465 let request = self.client.request(proto::CreateContext { project_id });
466 cx.spawn(|this, mut cx| async move {
467 let response = request.await?;
468 let context_id = ContextId::from_proto(response.context_id);
469 let context_proto = response.context.context("invalid context")?;
470 let context = cx.new_model(|cx| {
471 Context::new(
472 context_id.clone(),
473 replica_id,
474 capability,
475 language_registry,
476 prompt_builder,
477 slash_commands,
478 tools,
479 Some(project),
480 Some(telemetry),
481 cx,
482 )
483 })?;
484 let operations = cx
485 .background_executor()
486 .spawn(async move {
487 context_proto
488 .operations
489 .into_iter()
490 .map(ContextOperation::from_proto)
491 .collect::<Result<Vec<_>>>()
492 })
493 .await?;
494 context.update(&mut cx, |context, cx| context.apply_ops(operations, cx))?;
495 this.update(&mut cx, |this, cx| {
496 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
497 existing_context
498 } else {
499 this.register_context(&context, cx);
500 this.synchronize_contexts(cx);
501 context
502 }
503 })
504 })
505 }
506
507 pub fn open_local_context(
508 &mut self,
509 path: PathBuf,
510 cx: &ModelContext<Self>,
511 ) -> Task<Result<Model<Context>>> {
512 if let Some(existing_context) = self.loaded_context_for_path(&path, cx) {
513 return Task::ready(Ok(existing_context));
514 }
515
516 let fs = self.fs.clone();
517 let languages = self.languages.clone();
518 let project = self.project.clone();
519 let telemetry = self.telemetry.clone();
520 let load = cx.background_executor().spawn({
521 let path = path.clone();
522 async move {
523 let saved_context = fs.load(&path).await?;
524 SavedContext::from_json(&saved_context)
525 }
526 });
527 let prompt_builder = self.prompt_builder.clone();
528 let slash_commands = self.slash_commands.clone();
529 let tools = self.tools.clone();
530
531 cx.spawn(|this, mut cx| async move {
532 let saved_context = load.await?;
533 let context = cx.new_model(|cx| {
534 Context::deserialize(
535 saved_context,
536 path.clone(),
537 languages,
538 prompt_builder,
539 slash_commands,
540 tools,
541 Some(project),
542 Some(telemetry),
543 cx,
544 )
545 })?;
546 this.update(&mut cx, |this, cx| {
547 if let Some(existing_context) = this.loaded_context_for_path(&path, cx) {
548 existing_context
549 } else {
550 this.register_context(&context, cx);
551 context
552 }
553 })
554 })
555 }
556
557 fn loaded_context_for_path(&self, path: &Path, cx: &AppContext) -> Option<Model<Context>> {
558 self.contexts.iter().find_map(|context| {
559 let context = context.upgrade()?;
560 if context.read(cx).path() == Some(path) {
561 Some(context)
562 } else {
563 None
564 }
565 })
566 }
567
568 pub(super) fn loaded_context_for_id(
569 &self,
570 id: &ContextId,
571 cx: &AppContext,
572 ) -> Option<Model<Context>> {
573 self.contexts.iter().find_map(|context| {
574 let context = context.upgrade()?;
575 if context.read(cx).id() == id {
576 Some(context)
577 } else {
578 None
579 }
580 })
581 }
582
583 pub fn open_remote_context(
584 &mut self,
585 context_id: ContextId,
586 cx: &mut ModelContext<Self>,
587 ) -> Task<Result<Model<Context>>> {
588 let project = self.project.read(cx);
589 let Some(project_id) = project.remote_id() else {
590 return Task::ready(Err(anyhow!("project was not remote")));
591 };
592
593 if let Some(context) = self.loaded_context_for_id(&context_id, cx) {
594 return Task::ready(Ok(context));
595 }
596
597 let replica_id = project.replica_id();
598 let capability = project.capability();
599 let language_registry = self.languages.clone();
600 let project = self.project.clone();
601 let telemetry = self.telemetry.clone();
602 let request = self.client.request(proto::OpenContext {
603 project_id,
604 context_id: context_id.to_proto(),
605 });
606 let prompt_builder = self.prompt_builder.clone();
607 let slash_commands = self.slash_commands.clone();
608 let tools = self.tools.clone();
609 cx.spawn(|this, mut cx| async move {
610 let response = request.await?;
611 let context_proto = response.context.context("invalid context")?;
612 let context = cx.new_model(|cx| {
613 Context::new(
614 context_id.clone(),
615 replica_id,
616 capability,
617 language_registry,
618 prompt_builder,
619 slash_commands,
620 tools,
621 Some(project),
622 Some(telemetry),
623 cx,
624 )
625 })?;
626 let operations = cx
627 .background_executor()
628 .spawn(async move {
629 context_proto
630 .operations
631 .into_iter()
632 .map(ContextOperation::from_proto)
633 .collect::<Result<Vec<_>>>()
634 })
635 .await?;
636 context.update(&mut cx, |context, cx| context.apply_ops(operations, cx))?;
637 this.update(&mut cx, |this, cx| {
638 if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
639 existing_context
640 } else {
641 this.register_context(&context, cx);
642 this.synchronize_contexts(cx);
643 context
644 }
645 })
646 })
647 }
648
649 fn register_context(&mut self, context: &Model<Context>, cx: &mut ModelContext<Self>) {
650 let handle = if self.project_is_shared {
651 ContextHandle::Strong(context.clone())
652 } else {
653 ContextHandle::Weak(context.downgrade())
654 };
655 self.contexts.push(handle);
656 self.advertise_contexts(cx);
657 cx.subscribe(context, Self::handle_context_event).detach();
658 }
659
660 fn handle_context_event(
661 &mut self,
662 context: Model<Context>,
663 event: &ContextEvent,
664 cx: &mut ModelContext<Self>,
665 ) {
666 let Some(project_id) = self.project.read(cx).remote_id() else {
667 return;
668 };
669
670 match event {
671 ContextEvent::SummaryChanged => {
672 self.advertise_contexts(cx);
673 }
674 ContextEvent::Operation(operation) => {
675 let context_id = context.read(cx).id().to_proto();
676 let operation = operation.to_proto();
677 self.client
678 .send(proto::UpdateContext {
679 project_id,
680 context_id,
681 operation: Some(operation),
682 })
683 .log_err();
684 }
685 _ => {}
686 }
687 }
688
689 fn advertise_contexts(&self, cx: &AppContext) {
690 let Some(project_id) = self.project.read(cx).remote_id() else {
691 return;
692 };
693
694 // For now, only the host can advertise their open contexts.
695 if self.project.read(cx).is_via_collab() {
696 return;
697 }
698
699 let contexts = self
700 .contexts
701 .iter()
702 .rev()
703 .filter_map(|context| {
704 let context = context.upgrade()?.read(cx);
705 if context.replica_id() == ReplicaId::default() {
706 Some(proto::ContextMetadata {
707 context_id: context.id().to_proto(),
708 summary: context.summary().map(|summary| summary.text.clone()),
709 })
710 } else {
711 None
712 }
713 })
714 .collect();
715 self.client
716 .send(proto::AdvertiseContexts {
717 project_id,
718 contexts,
719 })
720 .ok();
721 }
722
723 fn synchronize_contexts(&mut self, cx: &mut ModelContext<Self>) {
724 let Some(project_id) = self.project.read(cx).remote_id() else {
725 return;
726 };
727
728 let contexts = self
729 .contexts
730 .iter()
731 .filter_map(|context| {
732 let context = context.upgrade()?.read(cx);
733 if context.replica_id() != ReplicaId::default() {
734 Some(context.version(cx).to_proto(context.id().clone()))
735 } else {
736 None
737 }
738 })
739 .collect();
740
741 let client = self.client.clone();
742 let request = self.client.request(proto::SynchronizeContexts {
743 project_id,
744 contexts,
745 });
746 cx.spawn(|this, cx| async move {
747 let response = request.await?;
748
749 let mut context_ids = Vec::new();
750 let mut operations = Vec::new();
751 this.read_with(&cx, |this, cx| {
752 for context_version_proto in response.contexts {
753 let context_version = ContextVersion::from_proto(&context_version_proto);
754 let context_id = ContextId::from_proto(context_version_proto.context_id);
755 if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
756 context_ids.push(context_id);
757 operations.push(context.read(cx).serialize_ops(&context_version, cx));
758 }
759 }
760 })?;
761
762 let operations = futures::future::join_all(operations).await;
763 for (context_id, operations) in context_ids.into_iter().zip(operations) {
764 for operation in operations {
765 client.send(proto::UpdateContext {
766 project_id,
767 context_id: context_id.to_proto(),
768 operation: Some(operation),
769 })?;
770 }
771 }
772
773 anyhow::Ok(())
774 })
775 .detach_and_log_err(cx);
776 }
777
778 pub fn search(&self, query: String, cx: &AppContext) -> Task<Vec<SavedContextMetadata>> {
779 let metadata = self.contexts_metadata.clone();
780 let executor = cx.background_executor().clone();
781 cx.background_executor().spawn(async move {
782 if query.is_empty() {
783 metadata
784 } else {
785 let candidates = metadata
786 .iter()
787 .enumerate()
788 .map(|(id, metadata)| StringMatchCandidate::new(id, metadata.title.clone()))
789 .collect::<Vec<_>>();
790 let matches = fuzzy::match_strings(
791 &candidates,
792 &query,
793 false,
794 100,
795 &Default::default(),
796 executor,
797 )
798 .await;
799
800 matches
801 .into_iter()
802 .map(|mat| metadata[mat.candidate_id].clone())
803 .collect()
804 }
805 })
806 }
807
808 pub fn host_contexts(&self) -> &[RemoteContextMetadata] {
809 &self.host_contexts
810 }
811
812 fn reload(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
813 let fs = self.fs.clone();
814 cx.spawn(|this, mut cx| async move {
815 fs.create_dir(contexts_dir()).await?;
816
817 let mut paths = fs.read_dir(contexts_dir()).await?;
818 let mut contexts = Vec::<SavedContextMetadata>::new();
819 while let Some(path) = paths.next().await {
820 let path = path?;
821 if path.extension() != Some(OsStr::new("json")) {
822 continue;
823 }
824
825 let pattern = r" - \d+.zed.json$";
826 let re = Regex::new(pattern).unwrap();
827
828 let metadata = fs.metadata(&path).await?;
829 if let Some((file_name, metadata)) = path
830 .file_name()
831 .and_then(|name| name.to_str())
832 .zip(metadata)
833 {
834 // This is used to filter out contexts saved by the new assistant.
835 if !re.is_match(file_name) {
836 continue;
837 }
838
839 if let Some(title) = re.replace(file_name, "").lines().next() {
840 contexts.push(SavedContextMetadata {
841 title: title.to_string(),
842 path,
843 mtime: metadata.mtime.into(),
844 });
845 }
846 }
847 }
848 contexts.sort_unstable_by_key(|context| Reverse(context.mtime));
849
850 this.update(&mut cx, |this, cx| {
851 this.contexts_metadata = contexts;
852 cx.notify();
853 })
854 })
855 }
856
857 pub fn restart_context_servers(&mut self, cx: &mut ModelContext<Self>) {
858 cx.update_model(
859 &self.context_server_manager,
860 |context_server_manager, cx| {
861 for server in context_server_manager.servers() {
862 context_server_manager
863 .restart_server(&server.id(), cx)
864 .detach_and_log_err(cx);
865 }
866 },
867 );
868 }
869
870 fn register_context_server_handlers(&self, cx: &mut ModelContext<Self>) {
871 cx.subscribe(
872 &self.context_server_manager.clone(),
873 Self::handle_context_server_event,
874 )
875 .detach();
876 }
877
878 fn handle_context_server_event(
879 &mut self,
880 context_server_manager: Model<ContextServerManager>,
881 event: &context_servers::manager::Event,
882 cx: &mut ModelContext<Self>,
883 ) {
884 let slash_command_working_set = self.slash_commands.clone();
885 let tool_working_set = self.tools.clone();
886 match event {
887 context_servers::manager::Event::ServerStarted { server_id } => {
888 if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
889 let context_server_manager = context_server_manager.clone();
890 cx.spawn({
891 let server = server.clone();
892 let server_id = server_id.clone();
893 |this, mut cx| async move {
894 let Some(protocol) = server.client() else {
895 return;
896 };
897
898 if protocol.capable(context_servers::protocol::ServerCapability::Prompts) {
899 if let Some(prompts) = protocol.list_prompts().await.log_err() {
900 let slash_command_ids = prompts
901 .into_iter()
902 .filter(context_server_command::acceptable_prompt)
903 .map(|prompt| {
904 log::info!(
905 "registering context server command: {:?}",
906 prompt.name
907 );
908 slash_command_working_set.insert(Arc::new(
909 context_server_command::ContextServerSlashCommand::new(
910 context_server_manager.clone(),
911 &server,
912 prompt,
913 ),
914 ))
915 })
916 .collect::<Vec<_>>();
917
918 this.update(&mut cx, |this, _cx| {
919 this.context_server_slash_command_ids
920 .insert(server_id.clone(), slash_command_ids);
921 })
922 .log_err();
923 }
924 }
925
926 if protocol.capable(context_servers::protocol::ServerCapability::Tools) {
927 if let Some(tools) = protocol.list_tools().await.log_err() {
928 let tool_ids = tools.tools.into_iter().map(|tool| {
929 log::info!("registering context server tool: {:?}", tool.name);
930 tool_working_set.insert(
931 Arc::new(tools::context_server_tool::ContextServerTool::new(
932 context_server_manager.clone(),
933 server.id(),
934 tool,
935 )),
936 )
937
938 }).collect::<Vec<_>>();
939
940
941 this.update(&mut cx, |this, _cx| {
942 this.context_server_tool_ids
943 .insert(server_id, tool_ids);
944 })
945 .log_err();
946 }
947 }
948 }
949 })
950 .detach();
951 }
952 }
953 context_servers::manager::Event::ServerStopped { server_id } => {
954 if let Some(slash_command_ids) =
955 self.context_server_slash_command_ids.remove(server_id)
956 {
957 slash_command_working_set.remove(&slash_command_ids);
958 }
959
960 if let Some(tool_ids) = self.context_server_tool_ids.remove(server_id) {
961 tool_working_set.remove(&tool_ids);
962 }
963 }
964 }
965 }
966}