context_store.rs

  1use crate::slash_command::context_server_command;
  2use crate::{
  3    prompts::PromptBuilder, slash_command_working_set::SlashCommandWorkingSet, Context,
  4    ContextEvent, ContextId, ContextOperation, ContextVersion, SavedContext, SavedContextMetadata,
  5};
  6use crate::{tools, SlashCommandId, ToolId, ToolWorkingSet};
  7use anyhow::{anyhow, Context as _, Result};
  8use client::{proto, telemetry::Telemetry, Client, TypedEnvelope};
  9use clock::ReplicaId;
 10use collections::HashMap;
 11use command_palette_hooks::CommandPaletteFilter;
 12use context_servers::manager::{ContextServerManager, ContextServerSettings};
 13use context_servers::{ContextServerFactoryRegistry, CONTEXT_SERVERS_NAMESPACE};
 14use fs::Fs;
 15use futures::StreamExt;
 16use fuzzy::StringMatchCandidate;
 17use gpui::{
 18    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
 19};
 20use language::LanguageRegistry;
 21use paths::contexts_dir;
 22use project::Project;
 23use regex::Regex;
 24use rpc::AnyProtoClient;
 25use settings::{Settings as _, SettingsStore};
 26use std::{
 27    cmp::Reverse,
 28    ffi::OsStr,
 29    mem,
 30    path::{Path, PathBuf},
 31    sync::Arc,
 32    time::Duration,
 33};
 34use util::{ResultExt, TryFutureExt};
 35
 36pub fn init(client: &AnyProtoClient) {
 37    client.add_model_message_handler(ContextStore::handle_advertise_contexts);
 38    client.add_model_request_handler(ContextStore::handle_open_context);
 39    client.add_model_request_handler(ContextStore::handle_create_context);
 40    client.add_model_message_handler(ContextStore::handle_update_context);
 41    client.add_model_request_handler(ContextStore::handle_synchronize_contexts);
 42}
 43
 44#[derive(Clone)]
 45pub struct RemoteContextMetadata {
 46    pub id: ContextId,
 47    pub summary: Option<String>,
 48}
 49
 50pub struct ContextStore {
 51    contexts: Vec<ContextHandle>,
 52    contexts_metadata: Vec<SavedContextMetadata>,
 53    context_server_manager: Model<ContextServerManager>,
 54    context_server_slash_command_ids: HashMap<Arc<str>, Vec<SlashCommandId>>,
 55    context_server_tool_ids: HashMap<Arc<str>, Vec<ToolId>>,
 56    host_contexts: Vec<RemoteContextMetadata>,
 57    fs: Arc<dyn Fs>,
 58    languages: Arc<LanguageRegistry>,
 59    slash_commands: Arc<SlashCommandWorkingSet>,
 60    tools: Arc<ToolWorkingSet>,
 61    telemetry: Arc<Telemetry>,
 62    _watch_updates: Task<Option<()>>,
 63    client: Arc<Client>,
 64    project: Model<Project>,
 65    project_is_shared: bool,
 66    client_subscription: Option<client::Subscription>,
 67    _project_subscriptions: Vec<gpui::Subscription>,
 68    prompt_builder: Arc<PromptBuilder>,
 69}
 70
 71pub enum ContextStoreEvent {
 72    ContextCreated(ContextId),
 73}
 74
 75impl EventEmitter<ContextStoreEvent> for ContextStore {}
 76
 77enum ContextHandle {
 78    Weak(WeakModel<Context>),
 79    Strong(Model<Context>),
 80}
 81
 82impl ContextHandle {
 83    fn upgrade(&self) -> Option<Model<Context>> {
 84        match self {
 85            ContextHandle::Weak(weak) => weak.upgrade(),
 86            ContextHandle::Strong(strong) => Some(strong.clone()),
 87        }
 88    }
 89
 90    fn downgrade(&self) -> WeakModel<Context> {
 91        match self {
 92            ContextHandle::Weak(weak) => weak.clone(),
 93            ContextHandle::Strong(strong) => strong.downgrade(),
 94        }
 95    }
 96}
 97
 98impl ContextStore {
 99    pub fn new(
100        project: Model<Project>,
101        prompt_builder: Arc<PromptBuilder>,
102        slash_commands: Arc<SlashCommandWorkingSet>,
103        tools: Arc<ToolWorkingSet>,
104        cx: &mut AppContext,
105    ) -> Task<Result<Model<Self>>> {
106        let fs = project.read(cx).fs().clone();
107        let languages = project.read(cx).languages().clone();
108        let telemetry = project.read(cx).client().telemetry().clone();
109        cx.spawn(|mut cx| async move {
110            const CONTEXT_WATCH_DURATION: Duration = Duration::from_millis(100);
111            let (mut events, _) = fs.watch(contexts_dir(), CONTEXT_WATCH_DURATION).await;
112
113            let this = cx.new_model(|cx: &mut ModelContext<Self>| {
114                let context_server_manager = cx.new_model(|_cx| ContextServerManager::new());
115                let mut this = Self {
116                    contexts: Vec::new(),
117                    contexts_metadata: Vec::new(),
118                    context_server_manager,
119                    context_server_slash_command_ids: HashMap::default(),
120                    context_server_tool_ids: HashMap::default(),
121                    host_contexts: Vec::new(),
122                    fs,
123                    languages,
124                    slash_commands,
125                    tools,
126                    telemetry,
127                    _watch_updates: cx.spawn(|this, mut cx| {
128                        async move {
129                            while events.next().await.is_some() {
130                                this.update(&mut cx, |this, cx| this.reload(cx))?
131                                    .await
132                                    .log_err();
133                            }
134                            anyhow::Ok(())
135                        }
136                        .log_err()
137                    }),
138                    client_subscription: None,
139                    _project_subscriptions: vec![
140                        cx.observe(&project, Self::handle_project_changed),
141                        cx.subscribe(&project, Self::handle_project_event),
142                    ],
143                    project_is_shared: false,
144                    client: project.read(cx).client(),
145                    project: project.clone(),
146                    prompt_builder,
147                };
148                this.handle_project_changed(project, cx);
149                this.synchronize_contexts(cx);
150                this.register_context_server_handlers(cx);
151
152                // TODO: At the time when we construct the `ContextStore` we may not have yet initialized the extensions.
153                // In order to register the context servers when the extension is loaded, we're periodically looping to
154                // see if there are context servers to register.
155                //
156                // I tried doing this in a subscription on the `ExtensionStore`, but it never seemed to fire.
157                //
158                // We should find a more elegant way to do this.
159                let context_server_factory_registry =
160                    ContextServerFactoryRegistry::default_global(cx);
161                cx.spawn(|context_store, mut cx| async move {
162                    loop {
163                        let mut servers_to_register = Vec::new();
164                        for (_id, factory) in
165                            context_server_factory_registry.context_server_factories()
166                        {
167                            if let Some(server) = factory(&cx).await.log_err() {
168                                servers_to_register.push(server);
169                            }
170                        }
171
172                        let Some(_) = context_store
173                            .update(&mut cx, |this, cx| {
174                                this.context_server_manager.update(cx, |this, cx| {
175                                    for server in servers_to_register {
176                                        this.add_server(server, cx).detach_and_log_err(cx);
177                                    }
178                                })
179                            })
180                            .log_err()
181                        else {
182                            break;
183                        };
184
185                        smol::Timer::after(Duration::from_millis(100)).await;
186                    }
187
188                    anyhow::Ok(())
189                })
190                .detach_and_log_err(cx);
191
192                this
193            })?;
194            this.update(&mut cx, |this, cx| this.reload(cx))?
195                .await
196                .log_err();
197
198            this.update(&mut cx, |this, cx| {
199                this.watch_context_server_settings(cx);
200            })
201            .log_err();
202
203            Ok(this)
204        })
205    }
206
207    fn watch_context_server_settings(&self, cx: &mut ModelContext<Self>) {
208        cx.observe_global::<SettingsStore>(move |this, cx| {
209            this.context_server_manager.update(cx, |manager, cx| {
210                let location = this.project.read(cx).worktrees(cx).next().map(|worktree| {
211                    settings::SettingsLocation {
212                        worktree_id: worktree.read(cx).id(),
213                        path: Path::new(""),
214                    }
215                });
216                let settings = ContextServerSettings::get(location, cx);
217
218                manager.maintain_servers(settings, cx);
219
220                let has_any_context_servers = !manager.servers().is_empty();
221                CommandPaletteFilter::update_global(cx, |filter, _cx| {
222                    if has_any_context_servers {
223                        filter.show_namespace(CONTEXT_SERVERS_NAMESPACE);
224                    } else {
225                        filter.hide_namespace(CONTEXT_SERVERS_NAMESPACE);
226                    }
227                });
228            })
229        })
230        .detach();
231    }
232
233    async fn handle_advertise_contexts(
234        this: Model<Self>,
235        envelope: TypedEnvelope<proto::AdvertiseContexts>,
236        mut cx: AsyncAppContext,
237    ) -> Result<()> {
238        this.update(&mut cx, |this, cx| {
239            this.host_contexts = envelope
240                .payload
241                .contexts
242                .into_iter()
243                .map(|context| RemoteContextMetadata {
244                    id: ContextId::from_proto(context.context_id),
245                    summary: context.summary,
246                })
247                .collect();
248            cx.notify();
249        })
250    }
251
252    async fn handle_open_context(
253        this: Model<Self>,
254        envelope: TypedEnvelope<proto::OpenContext>,
255        mut cx: AsyncAppContext,
256    ) -> Result<proto::OpenContextResponse> {
257        let context_id = ContextId::from_proto(envelope.payload.context_id);
258        let operations = this.update(&mut cx, |this, cx| {
259            if this.project.read(cx).is_via_collab() {
260                return Err(anyhow!("only the host contexts can be opened"));
261            }
262
263            let context = this
264                .loaded_context_for_id(&context_id, cx)
265                .context("context not found")?;
266            if context.read(cx).replica_id() != ReplicaId::default() {
267                return Err(anyhow!("context must be opened via the host"));
268            }
269
270            anyhow::Ok(
271                context
272                    .read(cx)
273                    .serialize_ops(&ContextVersion::default(), cx),
274            )
275        })??;
276        let operations = operations.await;
277        Ok(proto::OpenContextResponse {
278            context: Some(proto::Context { operations }),
279        })
280    }
281
282    async fn handle_create_context(
283        this: Model<Self>,
284        _: TypedEnvelope<proto::CreateContext>,
285        mut cx: AsyncAppContext,
286    ) -> Result<proto::CreateContextResponse> {
287        let (context_id, operations) = this.update(&mut cx, |this, cx| {
288            if this.project.read(cx).is_via_collab() {
289                return Err(anyhow!("can only create contexts as the host"));
290            }
291
292            let context = this.create(cx);
293            let context_id = context.read(cx).id().clone();
294            cx.emit(ContextStoreEvent::ContextCreated(context_id.clone()));
295
296            anyhow::Ok((
297                context_id,
298                context
299                    .read(cx)
300                    .serialize_ops(&ContextVersion::default(), cx),
301            ))
302        })??;
303        let operations = operations.await;
304        Ok(proto::CreateContextResponse {
305            context_id: context_id.to_proto(),
306            context: Some(proto::Context { operations }),
307        })
308    }
309
310    async fn handle_update_context(
311        this: Model<Self>,
312        envelope: TypedEnvelope<proto::UpdateContext>,
313        mut cx: AsyncAppContext,
314    ) -> Result<()> {
315        this.update(&mut cx, |this, cx| {
316            let context_id = ContextId::from_proto(envelope.payload.context_id);
317            if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
318                let operation_proto = envelope.payload.operation.context("invalid operation")?;
319                let operation = ContextOperation::from_proto(operation_proto)?;
320                context.update(cx, |context, cx| context.apply_ops([operation], cx));
321            }
322            Ok(())
323        })?
324    }
325
326    async fn handle_synchronize_contexts(
327        this: Model<Self>,
328        envelope: TypedEnvelope<proto::SynchronizeContexts>,
329        mut cx: AsyncAppContext,
330    ) -> Result<proto::SynchronizeContextsResponse> {
331        this.update(&mut cx, |this, cx| {
332            if this.project.read(cx).is_via_collab() {
333                return Err(anyhow!("only the host can synchronize contexts"));
334            }
335
336            let mut local_versions = Vec::new();
337            for remote_version_proto in envelope.payload.contexts {
338                let remote_version = ContextVersion::from_proto(&remote_version_proto);
339                let context_id = ContextId::from_proto(remote_version_proto.context_id);
340                if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
341                    let context = context.read(cx);
342                    let operations = context.serialize_ops(&remote_version, cx);
343                    local_versions.push(context.version(cx).to_proto(context_id.clone()));
344                    let client = this.client.clone();
345                    let project_id = envelope.payload.project_id;
346                    cx.background_executor()
347                        .spawn(async move {
348                            let operations = operations.await;
349                            for operation in operations {
350                                client.send(proto::UpdateContext {
351                                    project_id,
352                                    context_id: context_id.to_proto(),
353                                    operation: Some(operation),
354                                })?;
355                            }
356                            anyhow::Ok(())
357                        })
358                        .detach_and_log_err(cx);
359                }
360            }
361
362            this.advertise_contexts(cx);
363
364            anyhow::Ok(proto::SynchronizeContextsResponse {
365                contexts: local_versions,
366            })
367        })?
368    }
369
370    fn handle_project_changed(&mut self, _: Model<Project>, cx: &mut ModelContext<Self>) {
371        let is_shared = self.project.read(cx).is_shared();
372        let was_shared = mem::replace(&mut self.project_is_shared, is_shared);
373        if is_shared == was_shared {
374            return;
375        }
376
377        if is_shared {
378            self.contexts.retain_mut(|context| {
379                if let Some(strong_context) = context.upgrade() {
380                    *context = ContextHandle::Strong(strong_context);
381                    true
382                } else {
383                    false
384                }
385            });
386            let remote_id = self.project.read(cx).remote_id().unwrap();
387            self.client_subscription = self
388                .client
389                .subscribe_to_entity(remote_id)
390                .log_err()
391                .map(|subscription| subscription.set_model(&cx.handle(), &mut cx.to_async()));
392            self.advertise_contexts(cx);
393        } else {
394            self.client_subscription = None;
395        }
396    }
397
398    fn handle_project_event(
399        &mut self,
400        _: Model<Project>,
401        event: &project::Event,
402        cx: &mut ModelContext<Self>,
403    ) {
404        match event {
405            project::Event::Reshared => {
406                self.advertise_contexts(cx);
407            }
408            project::Event::HostReshared | project::Event::Rejoined => {
409                self.synchronize_contexts(cx);
410            }
411            project::Event::DisconnectedFromHost => {
412                self.contexts.retain_mut(|context| {
413                    if let Some(strong_context) = context.upgrade() {
414                        *context = ContextHandle::Weak(context.downgrade());
415                        strong_context.update(cx, |context, cx| {
416                            if context.replica_id() != ReplicaId::default() {
417                                context.set_capability(language::Capability::ReadOnly, cx);
418                            }
419                        });
420                        true
421                    } else {
422                        false
423                    }
424                });
425                self.host_contexts.clear();
426                cx.notify();
427            }
428            _ => {}
429        }
430    }
431
432    pub fn create(&mut self, cx: &mut ModelContext<Self>) -> Model<Context> {
433        let context = cx.new_model(|cx| {
434            Context::local(
435                self.languages.clone(),
436                Some(self.project.clone()),
437                Some(self.telemetry.clone()),
438                self.prompt_builder.clone(),
439                self.slash_commands.clone(),
440                self.tools.clone(),
441                cx,
442            )
443        });
444        self.register_context(&context, cx);
445        context
446    }
447
448    pub fn create_remote_context(
449        &mut self,
450        cx: &mut ModelContext<Self>,
451    ) -> Task<Result<Model<Context>>> {
452        let project = self.project.read(cx);
453        let Some(project_id) = project.remote_id() else {
454            return Task::ready(Err(anyhow!("project was not remote")));
455        };
456
457        let replica_id = project.replica_id();
458        let capability = project.capability();
459        let language_registry = self.languages.clone();
460        let project = self.project.clone();
461        let telemetry = self.telemetry.clone();
462        let prompt_builder = self.prompt_builder.clone();
463        let slash_commands = self.slash_commands.clone();
464        let tools = self.tools.clone();
465        let request = self.client.request(proto::CreateContext { project_id });
466        cx.spawn(|this, mut cx| async move {
467            let response = request.await?;
468            let context_id = ContextId::from_proto(response.context_id);
469            let context_proto = response.context.context("invalid context")?;
470            let context = cx.new_model(|cx| {
471                Context::new(
472                    context_id.clone(),
473                    replica_id,
474                    capability,
475                    language_registry,
476                    prompt_builder,
477                    slash_commands,
478                    tools,
479                    Some(project),
480                    Some(telemetry),
481                    cx,
482                )
483            })?;
484            let operations = cx
485                .background_executor()
486                .spawn(async move {
487                    context_proto
488                        .operations
489                        .into_iter()
490                        .map(ContextOperation::from_proto)
491                        .collect::<Result<Vec<_>>>()
492                })
493                .await?;
494            context.update(&mut cx, |context, cx| context.apply_ops(operations, cx))?;
495            this.update(&mut cx, |this, cx| {
496                if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
497                    existing_context
498                } else {
499                    this.register_context(&context, cx);
500                    this.synchronize_contexts(cx);
501                    context
502                }
503            })
504        })
505    }
506
507    pub fn open_local_context(
508        &mut self,
509        path: PathBuf,
510        cx: &ModelContext<Self>,
511    ) -> Task<Result<Model<Context>>> {
512        if let Some(existing_context) = self.loaded_context_for_path(&path, cx) {
513            return Task::ready(Ok(existing_context));
514        }
515
516        let fs = self.fs.clone();
517        let languages = self.languages.clone();
518        let project = self.project.clone();
519        let telemetry = self.telemetry.clone();
520        let load = cx.background_executor().spawn({
521            let path = path.clone();
522            async move {
523                let saved_context = fs.load(&path).await?;
524                SavedContext::from_json(&saved_context)
525            }
526        });
527        let prompt_builder = self.prompt_builder.clone();
528        let slash_commands = self.slash_commands.clone();
529        let tools = self.tools.clone();
530
531        cx.spawn(|this, mut cx| async move {
532            let saved_context = load.await?;
533            let context = cx.new_model(|cx| {
534                Context::deserialize(
535                    saved_context,
536                    path.clone(),
537                    languages,
538                    prompt_builder,
539                    slash_commands,
540                    tools,
541                    Some(project),
542                    Some(telemetry),
543                    cx,
544                )
545            })?;
546            this.update(&mut cx, |this, cx| {
547                if let Some(existing_context) = this.loaded_context_for_path(&path, cx) {
548                    existing_context
549                } else {
550                    this.register_context(&context, cx);
551                    context
552                }
553            })
554        })
555    }
556
557    fn loaded_context_for_path(&self, path: &Path, cx: &AppContext) -> Option<Model<Context>> {
558        self.contexts.iter().find_map(|context| {
559            let context = context.upgrade()?;
560            if context.read(cx).path() == Some(path) {
561                Some(context)
562            } else {
563                None
564            }
565        })
566    }
567
568    pub(super) fn loaded_context_for_id(
569        &self,
570        id: &ContextId,
571        cx: &AppContext,
572    ) -> Option<Model<Context>> {
573        self.contexts.iter().find_map(|context| {
574            let context = context.upgrade()?;
575            if context.read(cx).id() == id {
576                Some(context)
577            } else {
578                None
579            }
580        })
581    }
582
583    pub fn open_remote_context(
584        &mut self,
585        context_id: ContextId,
586        cx: &mut ModelContext<Self>,
587    ) -> Task<Result<Model<Context>>> {
588        let project = self.project.read(cx);
589        let Some(project_id) = project.remote_id() else {
590            return Task::ready(Err(anyhow!("project was not remote")));
591        };
592
593        if let Some(context) = self.loaded_context_for_id(&context_id, cx) {
594            return Task::ready(Ok(context));
595        }
596
597        let replica_id = project.replica_id();
598        let capability = project.capability();
599        let language_registry = self.languages.clone();
600        let project = self.project.clone();
601        let telemetry = self.telemetry.clone();
602        let request = self.client.request(proto::OpenContext {
603            project_id,
604            context_id: context_id.to_proto(),
605        });
606        let prompt_builder = self.prompt_builder.clone();
607        let slash_commands = self.slash_commands.clone();
608        let tools = self.tools.clone();
609        cx.spawn(|this, mut cx| async move {
610            let response = request.await?;
611            let context_proto = response.context.context("invalid context")?;
612            let context = cx.new_model(|cx| {
613                Context::new(
614                    context_id.clone(),
615                    replica_id,
616                    capability,
617                    language_registry,
618                    prompt_builder,
619                    slash_commands,
620                    tools,
621                    Some(project),
622                    Some(telemetry),
623                    cx,
624                )
625            })?;
626            let operations = cx
627                .background_executor()
628                .spawn(async move {
629                    context_proto
630                        .operations
631                        .into_iter()
632                        .map(ContextOperation::from_proto)
633                        .collect::<Result<Vec<_>>>()
634                })
635                .await?;
636            context.update(&mut cx, |context, cx| context.apply_ops(operations, cx))?;
637            this.update(&mut cx, |this, cx| {
638                if let Some(existing_context) = this.loaded_context_for_id(&context_id, cx) {
639                    existing_context
640                } else {
641                    this.register_context(&context, cx);
642                    this.synchronize_contexts(cx);
643                    context
644                }
645            })
646        })
647    }
648
649    fn register_context(&mut self, context: &Model<Context>, cx: &mut ModelContext<Self>) {
650        let handle = if self.project_is_shared {
651            ContextHandle::Strong(context.clone())
652        } else {
653            ContextHandle::Weak(context.downgrade())
654        };
655        self.contexts.push(handle);
656        self.advertise_contexts(cx);
657        cx.subscribe(context, Self::handle_context_event).detach();
658    }
659
660    fn handle_context_event(
661        &mut self,
662        context: Model<Context>,
663        event: &ContextEvent,
664        cx: &mut ModelContext<Self>,
665    ) {
666        let Some(project_id) = self.project.read(cx).remote_id() else {
667            return;
668        };
669
670        match event {
671            ContextEvent::SummaryChanged => {
672                self.advertise_contexts(cx);
673            }
674            ContextEvent::Operation(operation) => {
675                let context_id = context.read(cx).id().to_proto();
676                let operation = operation.to_proto();
677                self.client
678                    .send(proto::UpdateContext {
679                        project_id,
680                        context_id,
681                        operation: Some(operation),
682                    })
683                    .log_err();
684            }
685            _ => {}
686        }
687    }
688
689    fn advertise_contexts(&self, cx: &AppContext) {
690        let Some(project_id) = self.project.read(cx).remote_id() else {
691            return;
692        };
693
694        // For now, only the host can advertise their open contexts.
695        if self.project.read(cx).is_via_collab() {
696            return;
697        }
698
699        let contexts = self
700            .contexts
701            .iter()
702            .rev()
703            .filter_map(|context| {
704                let context = context.upgrade()?.read(cx);
705                if context.replica_id() == ReplicaId::default() {
706                    Some(proto::ContextMetadata {
707                        context_id: context.id().to_proto(),
708                        summary: context.summary().map(|summary| summary.text.clone()),
709                    })
710                } else {
711                    None
712                }
713            })
714            .collect();
715        self.client
716            .send(proto::AdvertiseContexts {
717                project_id,
718                contexts,
719            })
720            .ok();
721    }
722
723    fn synchronize_contexts(&mut self, cx: &mut ModelContext<Self>) {
724        let Some(project_id) = self.project.read(cx).remote_id() else {
725            return;
726        };
727
728        let contexts = self
729            .contexts
730            .iter()
731            .filter_map(|context| {
732                let context = context.upgrade()?.read(cx);
733                if context.replica_id() != ReplicaId::default() {
734                    Some(context.version(cx).to_proto(context.id().clone()))
735                } else {
736                    None
737                }
738            })
739            .collect();
740
741        let client = self.client.clone();
742        let request = self.client.request(proto::SynchronizeContexts {
743            project_id,
744            contexts,
745        });
746        cx.spawn(|this, cx| async move {
747            let response = request.await?;
748
749            let mut context_ids = Vec::new();
750            let mut operations = Vec::new();
751            this.read_with(&cx, |this, cx| {
752                for context_version_proto in response.contexts {
753                    let context_version = ContextVersion::from_proto(&context_version_proto);
754                    let context_id = ContextId::from_proto(context_version_proto.context_id);
755                    if let Some(context) = this.loaded_context_for_id(&context_id, cx) {
756                        context_ids.push(context_id);
757                        operations.push(context.read(cx).serialize_ops(&context_version, cx));
758                    }
759                }
760            })?;
761
762            let operations = futures::future::join_all(operations).await;
763            for (context_id, operations) in context_ids.into_iter().zip(operations) {
764                for operation in operations {
765                    client.send(proto::UpdateContext {
766                        project_id,
767                        context_id: context_id.to_proto(),
768                        operation: Some(operation),
769                    })?;
770                }
771            }
772
773            anyhow::Ok(())
774        })
775        .detach_and_log_err(cx);
776    }
777
778    pub fn search(&self, query: String, cx: &AppContext) -> Task<Vec<SavedContextMetadata>> {
779        let metadata = self.contexts_metadata.clone();
780        let executor = cx.background_executor().clone();
781        cx.background_executor().spawn(async move {
782            if query.is_empty() {
783                metadata
784            } else {
785                let candidates = metadata
786                    .iter()
787                    .enumerate()
788                    .map(|(id, metadata)| StringMatchCandidate::new(id, metadata.title.clone()))
789                    .collect::<Vec<_>>();
790                let matches = fuzzy::match_strings(
791                    &candidates,
792                    &query,
793                    false,
794                    100,
795                    &Default::default(),
796                    executor,
797                )
798                .await;
799
800                matches
801                    .into_iter()
802                    .map(|mat| metadata[mat.candidate_id].clone())
803                    .collect()
804            }
805        })
806    }
807
808    pub fn host_contexts(&self) -> &[RemoteContextMetadata] {
809        &self.host_contexts
810    }
811
812    fn reload(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
813        let fs = self.fs.clone();
814        cx.spawn(|this, mut cx| async move {
815            fs.create_dir(contexts_dir()).await?;
816
817            let mut paths = fs.read_dir(contexts_dir()).await?;
818            let mut contexts = Vec::<SavedContextMetadata>::new();
819            while let Some(path) = paths.next().await {
820                let path = path?;
821                if path.extension() != Some(OsStr::new("json")) {
822                    continue;
823                }
824
825                let pattern = r" - \d+.zed.json$";
826                let re = Regex::new(pattern).unwrap();
827
828                let metadata = fs.metadata(&path).await?;
829                if let Some((file_name, metadata)) = path
830                    .file_name()
831                    .and_then(|name| name.to_str())
832                    .zip(metadata)
833                {
834                    // This is used to filter out contexts saved by the new assistant.
835                    if !re.is_match(file_name) {
836                        continue;
837                    }
838
839                    if let Some(title) = re.replace(file_name, "").lines().next() {
840                        contexts.push(SavedContextMetadata {
841                            title: title.to_string(),
842                            path,
843                            mtime: metadata.mtime.into(),
844                        });
845                    }
846                }
847            }
848            contexts.sort_unstable_by_key(|context| Reverse(context.mtime));
849
850            this.update(&mut cx, |this, cx| {
851                this.contexts_metadata = contexts;
852                cx.notify();
853            })
854        })
855    }
856
857    pub fn restart_context_servers(&mut self, cx: &mut ModelContext<Self>) {
858        cx.update_model(
859            &self.context_server_manager,
860            |context_server_manager, cx| {
861                for server in context_server_manager.servers() {
862                    context_server_manager
863                        .restart_server(&server.id(), cx)
864                        .detach_and_log_err(cx);
865                }
866            },
867        );
868    }
869
870    fn register_context_server_handlers(&self, cx: &mut ModelContext<Self>) {
871        cx.subscribe(
872            &self.context_server_manager.clone(),
873            Self::handle_context_server_event,
874        )
875        .detach();
876    }
877
878    fn handle_context_server_event(
879        &mut self,
880        context_server_manager: Model<ContextServerManager>,
881        event: &context_servers::manager::Event,
882        cx: &mut ModelContext<Self>,
883    ) {
884        let slash_command_working_set = self.slash_commands.clone();
885        let tool_working_set = self.tools.clone();
886        match event {
887            context_servers::manager::Event::ServerStarted { server_id } => {
888                if let Some(server) = context_server_manager.read(cx).get_server(server_id) {
889                    let context_server_manager = context_server_manager.clone();
890                    cx.spawn({
891                        let server = server.clone();
892                        let server_id = server_id.clone();
893                        |this, mut cx| async move {
894                            let Some(protocol) = server.client() else {
895                                return;
896                            };
897
898                            if protocol.capable(context_servers::protocol::ServerCapability::Prompts) {
899                                if let Some(prompts) = protocol.list_prompts().await.log_err() {
900                                    let slash_command_ids = prompts
901                                        .into_iter()
902                                        .filter(context_server_command::acceptable_prompt)
903                                        .map(|prompt| {
904                                            log::info!(
905                                                "registering context server command: {:?}",
906                                                prompt.name
907                                            );
908                                            slash_command_working_set.insert(Arc::new(
909                                                context_server_command::ContextServerSlashCommand::new(
910                                                    context_server_manager.clone(),
911                                                    &server,
912                                                    prompt,
913                                                ),
914                                            ))
915                                        })
916                                        .collect::<Vec<_>>();
917
918                                    this.update(&mut cx, |this, _cx| {
919                                        this.context_server_slash_command_ids
920                                            .insert(server_id.clone(), slash_command_ids);
921                                    })
922                                    .log_err();
923                                }
924                            }
925
926                            if protocol.capable(context_servers::protocol::ServerCapability::Tools) {
927                                if let Some(tools) = protocol.list_tools().await.log_err() {
928                                    let tool_ids = tools.tools.into_iter().map(|tool| {
929                                        log::info!("registering context server tool: {:?}", tool.name);
930                                        tool_working_set.insert(
931                                            Arc::new(tools::context_server_tool::ContextServerTool::new(
932                                                context_server_manager.clone(),
933                                                server.id(),
934                                                tool,
935                                            )),
936                                        )
937
938                                    }).collect::<Vec<_>>();
939
940
941                                    this.update(&mut cx, |this, _cx| {
942                                        this.context_server_tool_ids
943                                            .insert(server_id, tool_ids);
944                                    })
945                                    .log_err();
946                                }
947                            }
948                        }
949                    })
950                    .detach();
951                }
952            }
953            context_servers::manager::Event::ServerStopped { server_id } => {
954                if let Some(slash_command_ids) =
955                    self.context_server_slash_command_ids.remove(server_id)
956                {
957                    slash_command_working_set.remove(&slash_command_ids);
958                }
959
960                if let Some(tool_ids) = self.context_server_tool_ids.remove(server_id) {
961                    tool_working_set.remove(&tool_ids);
962                }
963            }
964        }
965    }
966}