context_server_store.rs

  1pub mod extension;
  2pub mod registry;
  3
  4use std::path::Path;
  5use std::sync::Arc;
  6use std::time::Duration;
  7
  8use anyhow::{Context as _, Result};
  9use collections::{HashMap, HashSet};
 10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
 11use futures::{FutureExt as _, future::join_all};
 12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
 13use registry::ContextServerDescriptorRegistry;
 14use remote::RemoteClient;
 15use rpc::{AnyProtoClient, TypedEnvelope, proto};
 16use settings::{Settings as _, SettingsStore};
 17use util::{ResultExt as _, rel_path::RelPath};
 18
 19use crate::{
 20    Project,
 21    project_settings::{ContextServerSettings, ProjectSettings},
 22    worktree_store::WorktreeStore,
 23};
 24
 25/// Maximum timeout for context server requests
 26/// Prevents extremely large timeout values from tying up resources indefinitely.
 27const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
 28
 29pub fn init(cx: &mut App) {
 30    extension::init(cx);
 31}
 32
 33actions!(
 34    context_server,
 35    [
 36        /// Restarts the context server.
 37        Restart
 38    ]
 39);
 40
 41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 42pub enum ContextServerStatus {
 43    Starting,
 44    Running,
 45    Stopped,
 46    Error(Arc<str>),
 47}
 48
 49impl ContextServerStatus {
 50    fn from_state(state: &ContextServerState) -> Self {
 51        match state {
 52            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
 53            ContextServerState::Running { .. } => ContextServerStatus::Running,
 54            ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
 55            ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
 56        }
 57    }
 58}
 59
 60enum ContextServerState {
 61    Starting {
 62        server: Arc<ContextServer>,
 63        configuration: Arc<ContextServerConfiguration>,
 64        _task: Task<()>,
 65    },
 66    Running {
 67        server: Arc<ContextServer>,
 68        configuration: Arc<ContextServerConfiguration>,
 69    },
 70    Stopped {
 71        server: Arc<ContextServer>,
 72        configuration: Arc<ContextServerConfiguration>,
 73    },
 74    Error {
 75        server: Arc<ContextServer>,
 76        configuration: Arc<ContextServerConfiguration>,
 77        error: Arc<str>,
 78    },
 79}
 80
 81impl ContextServerState {
 82    pub fn server(&self) -> Arc<ContextServer> {
 83        match self {
 84            ContextServerState::Starting { server, .. } => server.clone(),
 85            ContextServerState::Running { server, .. } => server.clone(),
 86            ContextServerState::Stopped { server, .. } => server.clone(),
 87            ContextServerState::Error { server, .. } => server.clone(),
 88        }
 89    }
 90
 91    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
 92        match self {
 93            ContextServerState::Starting { configuration, .. } => configuration.clone(),
 94            ContextServerState::Running { configuration, .. } => configuration.clone(),
 95            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
 96            ContextServerState::Error { configuration, .. } => configuration.clone(),
 97        }
 98    }
 99}
100
101#[derive(Debug, PartialEq, Eq)]
102pub enum ContextServerConfiguration {
103    Custom {
104        command: ContextServerCommand,
105        remote: bool,
106    },
107    Extension {
108        command: ContextServerCommand,
109        settings: serde_json::Value,
110        remote: bool,
111    },
112    Http {
113        url: url::Url,
114        headers: HashMap<String, String>,
115        timeout: Option<u64>,
116    },
117}
118
119impl ContextServerConfiguration {
120    pub fn command(&self) -> Option<&ContextServerCommand> {
121        match self {
122            ContextServerConfiguration::Custom { command, .. } => Some(command),
123            ContextServerConfiguration::Extension { command, .. } => Some(command),
124            ContextServerConfiguration::Http { .. } => None,
125        }
126    }
127
128    pub fn remote(&self) -> bool {
129        match self {
130            ContextServerConfiguration::Custom { remote, .. } => *remote,
131            ContextServerConfiguration::Extension { remote, .. } => *remote,
132            ContextServerConfiguration::Http { .. } => false,
133        }
134    }
135
136    pub async fn from_settings(
137        settings: ContextServerSettings,
138        id: ContextServerId,
139        registry: Entity<ContextServerDescriptorRegistry>,
140        worktree_store: Entity<WorktreeStore>,
141        cx: &AsyncApp,
142    ) -> Option<Self> {
143        match settings {
144            ContextServerSettings::Stdio {
145                enabled: _,
146                command,
147                remote,
148            } => Some(ContextServerConfiguration::Custom { command, remote }),
149            ContextServerSettings::Extension {
150                enabled: _,
151                settings,
152                remote,
153            } => {
154                let descriptor =
155                    cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
156
157                match descriptor.command(worktree_store, cx).await {
158                    Ok(command) => Some(ContextServerConfiguration::Extension {
159                        command,
160                        settings,
161                        remote,
162                    }),
163                    Err(e) => {
164                        log::error!(
165                            "Failed to create context server configuration from settings: {e:#}"
166                        );
167                        None
168                    }
169                }
170            }
171            ContextServerSettings::Http {
172                enabled: _,
173                url,
174                headers: auth,
175                timeout,
176            } => {
177                let url = url::Url::parse(&url).log_err()?;
178                Some(ContextServerConfiguration::Http {
179                    url,
180                    headers: auth,
181                    timeout,
182                })
183            }
184        }
185    }
186}
187
188pub type ContextServerFactory =
189    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
190
191enum ContextServerStoreState {
192    Local {
193        downstream_client: Option<(u64, AnyProtoClient)>,
194        is_headless: bool,
195    },
196    Remote {
197        project_id: u64,
198        upstream_client: Entity<RemoteClient>,
199    },
200}
201
202pub struct ContextServerStore {
203    state: ContextServerStoreState,
204    context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
205    servers: HashMap<ContextServerId, ContextServerState>,
206    worktree_store: Entity<WorktreeStore>,
207    project: Option<WeakEntity<Project>>,
208    registry: Entity<ContextServerDescriptorRegistry>,
209    update_servers_task: Option<Task<Result<()>>>,
210    context_server_factory: Option<ContextServerFactory>,
211    needs_server_update: bool,
212    _subscriptions: Vec<Subscription>,
213}
214
215pub enum Event {
216    ServerStatusChanged {
217        server_id: ContextServerId,
218        status: ContextServerStatus,
219    },
220}
221
222impl EventEmitter<Event> for ContextServerStore {}
223
224impl ContextServerStore {
225    pub fn local(
226        worktree_store: Entity<WorktreeStore>,
227        weak_project: Option<WeakEntity<Project>>,
228        headless: bool,
229        cx: &mut Context<Self>,
230    ) -> Self {
231        Self::new_internal(
232            !headless,
233            None,
234            ContextServerDescriptorRegistry::default_global(cx),
235            worktree_store,
236            weak_project,
237            ContextServerStoreState::Local {
238                downstream_client: None,
239                is_headless: headless,
240            },
241            cx,
242        )
243    }
244
245    pub fn remote(
246        project_id: u64,
247        upstream_client: Entity<RemoteClient>,
248        worktree_store: Entity<WorktreeStore>,
249        weak_project: Option<WeakEntity<Project>>,
250        cx: &mut Context<Self>,
251    ) -> Self {
252        Self::new_internal(
253            true,
254            None,
255            ContextServerDescriptorRegistry::default_global(cx),
256            worktree_store,
257            weak_project,
258            ContextServerStoreState::Remote {
259                project_id,
260                upstream_client,
261            },
262            cx,
263        )
264    }
265
266    pub fn init_headless(session: &AnyProtoClient) {
267        session.add_entity_request_handler(Self::handle_get_context_server_command);
268    }
269
270    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
271        if let ContextServerStoreState::Local {
272            downstream_client, ..
273        } = &mut self.state
274        {
275            *downstream_client = Some((project_id, client));
276        }
277    }
278
279    pub fn is_remote_project(&self) -> bool {
280        matches!(self.state, ContextServerStoreState::Remote { .. })
281    }
282
283    /// Returns all configured context server ids, excluding the ones that are disabled
284    pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
285        self.context_server_settings
286            .iter()
287            .filter(|(_, settings)| settings.enabled())
288            .map(|(id, _)| ContextServerId(id.clone()))
289            .collect()
290    }
291
292    #[cfg(feature = "test-support")]
293    pub fn test(
294        registry: Entity<ContextServerDescriptorRegistry>,
295        worktree_store: Entity<WorktreeStore>,
296        weak_project: Option<WeakEntity<Project>>,
297        cx: &mut Context<Self>,
298    ) -> Self {
299        Self::new_internal(
300            false,
301            None,
302            registry,
303            worktree_store,
304            weak_project,
305            ContextServerStoreState::Local {
306                downstream_client: None,
307                is_headless: false,
308            },
309            cx,
310        )
311    }
312
313    #[cfg(feature = "test-support")]
314    pub fn test_maintain_server_loop(
315        context_server_factory: Option<ContextServerFactory>,
316        registry: Entity<ContextServerDescriptorRegistry>,
317        worktree_store: Entity<WorktreeStore>,
318        weak_project: Option<WeakEntity<Project>>,
319        cx: &mut Context<Self>,
320    ) -> Self {
321        Self::new_internal(
322            true,
323            context_server_factory,
324            registry,
325            worktree_store,
326            weak_project,
327            ContextServerStoreState::Local {
328                downstream_client: None,
329                is_headless: false,
330            },
331            cx,
332        )
333    }
334
335    #[cfg(feature = "test-support")]
336    pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
337        self.context_server_factory = Some(factory);
338    }
339
340    #[cfg(feature = "test-support")]
341    pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
342        &self.registry
343    }
344
345    #[cfg(feature = "test-support")]
346    pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
347        let configuration = Arc::new(ContextServerConfiguration::Custom {
348            command: ContextServerCommand {
349                path: "test".into(),
350                args: vec![],
351                env: None,
352                timeout: None,
353            },
354            remote: false,
355        });
356        self.run_server(server, configuration, cx);
357    }
358
359    fn new_internal(
360        maintain_server_loop: bool,
361        context_server_factory: Option<ContextServerFactory>,
362        registry: Entity<ContextServerDescriptorRegistry>,
363        worktree_store: Entity<WorktreeStore>,
364        weak_project: Option<WeakEntity<Project>>,
365        state: ContextServerStoreState,
366        cx: &mut Context<Self>,
367    ) -> Self {
368        let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
369            let settings =
370                &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
371            if &this.context_server_settings == settings {
372                return;
373            }
374            this.context_server_settings = settings.clone();
375            if maintain_server_loop {
376                this.available_context_servers_changed(cx);
377            }
378        })];
379
380        if maintain_server_loop {
381            subscriptions.push(cx.observe(&registry, |this, _registry, cx| {
382                this.available_context_servers_changed(cx);
383            }));
384        }
385
386        let mut this = Self {
387            state,
388            _subscriptions: subscriptions,
389            context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
390                .context_servers
391                .clone(),
392            worktree_store,
393            project: weak_project,
394            registry,
395            needs_server_update: false,
396            servers: HashMap::default(),
397            update_servers_task: None,
398            context_server_factory,
399        };
400        if maintain_server_loop {
401            this.available_context_servers_changed(cx);
402        }
403        this
404    }
405
406    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
407        self.servers.get(id).map(|state| state.server())
408    }
409
410    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
411        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
412            Some(server.clone())
413        } else {
414            None
415        }
416    }
417
418    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
419        self.servers.get(id).map(ContextServerStatus::from_state)
420    }
421
422    pub fn configuration_for_server(
423        &self,
424        id: &ContextServerId,
425    ) -> Option<Arc<ContextServerConfiguration>> {
426        self.servers.get(id).map(|state| state.configuration())
427    }
428
429    pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
430        self.servers
431            .keys()
432            .cloned()
433            .chain(
434                self.registry
435                    .read(cx)
436                    .context_server_descriptors()
437                    .into_iter()
438                    .map(|(id, _)| ContextServerId(id)),
439            )
440            .collect()
441    }
442
443    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
444        self.servers
445            .values()
446            .filter_map(|state| {
447                if let ContextServerState::Running { server, .. } = state {
448                    Some(server.clone())
449                } else {
450                    None
451                }
452            })
453            .collect()
454    }
455
456    pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
457        cx.spawn(async move |this, cx| {
458            let this = this.upgrade().context("Context server store dropped")?;
459            let settings = this
460                .update(cx, |this, _| {
461                    this.context_server_settings.get(&server.id().0).cloned()
462                })
463                .context("Failed to get context server settings")?;
464
465            if !settings.enabled() {
466                return anyhow::Ok(());
467            }
468
469            let (registry, worktree_store) = this.update(cx, |this, _| {
470                (this.registry.clone(), this.worktree_store.clone())
471            });
472            let configuration = ContextServerConfiguration::from_settings(
473                settings,
474                server.id(),
475                registry,
476                worktree_store,
477                cx,
478            )
479            .await
480            .context("Failed to create context server configuration")?;
481
482            this.update(cx, |this, cx| {
483                this.run_server(server, Arc::new(configuration), cx)
484            });
485            Ok(())
486        })
487        .detach_and_log_err(cx);
488    }
489
490    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
491        if matches!(
492            self.servers.get(id),
493            Some(ContextServerState::Stopped { .. })
494        ) {
495            return Ok(());
496        }
497
498        let state = self
499            .servers
500            .remove(id)
501            .context("Context server not found")?;
502
503        let server = state.server();
504        let configuration = state.configuration();
505        let mut result = Ok(());
506        if let ContextServerState::Running { server, .. } = &state {
507            result = server.stop();
508        }
509        drop(state);
510
511        self.update_server_state(
512            id.clone(),
513            ContextServerState::Stopped {
514                configuration,
515                server,
516            },
517            cx,
518        );
519
520        result
521    }
522
523    fn run_server(
524        &mut self,
525        server: Arc<ContextServer>,
526        configuration: Arc<ContextServerConfiguration>,
527        cx: &mut Context<Self>,
528    ) {
529        let id = server.id();
530        if matches!(
531            self.servers.get(&id),
532            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
533        ) {
534            self.stop_server(&id, cx).log_err();
535        }
536        let task = cx.spawn({
537            let id = server.id();
538            let server = server.clone();
539            let configuration = configuration.clone();
540
541            async move |this, cx| {
542                match server.clone().start(cx).await {
543                    Ok(_) => {
544                        debug_assert!(server.client().is_some());
545
546                        this.update(cx, |this, cx| {
547                            this.update_server_state(
548                                id.clone(),
549                                ContextServerState::Running {
550                                    server,
551                                    configuration,
552                                },
553                                cx,
554                            )
555                        })
556                        .log_err()
557                    }
558                    Err(err) => {
559                        log::error!("{} context server failed to start: {}", id, err);
560                        this.update(cx, |this, cx| {
561                            this.update_server_state(
562                                id.clone(),
563                                ContextServerState::Error {
564                                    configuration,
565                                    server,
566                                    error: err.to_string().into(),
567                                },
568                                cx,
569                            )
570                        })
571                        .log_err()
572                    }
573                };
574            }
575        });
576
577        self.update_server_state(
578            id.clone(),
579            ContextServerState::Starting {
580                configuration,
581                _task: task,
582                server,
583            },
584            cx,
585        );
586    }
587
588    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
589        let state = self
590            .servers
591            .remove(id)
592            .context("Context server not found")?;
593        drop(state);
594        cx.emit(Event::ServerStatusChanged {
595            server_id: id.clone(),
596            status: ContextServerStatus::Stopped,
597        });
598        Ok(())
599    }
600
601    pub async fn create_context_server(
602        this: WeakEntity<Self>,
603        id: ContextServerId,
604        configuration: Arc<ContextServerConfiguration>,
605        cx: &mut AsyncApp,
606    ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
607        let remote = configuration.remote();
608        let needs_remote_command = match configuration.as_ref() {
609            ContextServerConfiguration::Custom { .. }
610            | ContextServerConfiguration::Extension { .. } => remote,
611            ContextServerConfiguration::Http { .. } => false,
612        };
613
614        let (remote_state, is_remote_project) = this.update(cx, |this, _| {
615            let remote_state = match &this.state {
616                ContextServerStoreState::Remote {
617                    project_id,
618                    upstream_client,
619                } if needs_remote_command => Some((*project_id, upstream_client.clone())),
620                _ => None,
621            };
622            (remote_state, this.is_remote_project())
623        })?;
624
625        let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
626            this.project
627                .as_ref()
628                .and_then(|project| {
629                    project
630                        .read_with(cx, |project, cx| project.active_project_directory(cx))
631                        .ok()
632                        .flatten()
633                })
634                .or_else(|| {
635                    this.worktree_store.read_with(cx, |store, cx| {
636                        store.visible_worktrees(cx).fold(None, |acc, item| {
637                            if acc.is_none() {
638                                item.read(cx).root_dir()
639                            } else {
640                                acc
641                            }
642                        })
643                    })
644                })
645        })?;
646
647        let configuration = if let Some((project_id, upstream_client)) = remote_state {
648            let root_dir = root_path.as_ref().map(|p| p.display().to_string());
649
650            let response = upstream_client
651                .update(cx, |client, _| {
652                    client
653                        .proto_client()
654                        .request(proto::GetContextServerCommand {
655                            project_id,
656                            server_id: id.0.to_string(),
657                            root_dir: root_dir.clone(),
658                        })
659                })
660                .await?;
661
662            let remote_command = upstream_client.update(cx, |client, _| {
663                client.build_command(
664                    Some(response.path),
665                    &response.args,
666                    &response.env.into_iter().collect(),
667                    root_dir,
668                    None,
669                )
670            })?;
671
672            let command = ContextServerCommand {
673                path: remote_command.program.into(),
674                args: remote_command.args,
675                env: Some(remote_command.env.into_iter().collect()),
676                timeout: None,
677            };
678
679            Arc::new(ContextServerConfiguration::Custom { command, remote })
680        } else {
681            configuration
682        };
683
684        let server: Arc<ContextServer> = this.update(cx, |this, cx| {
685            let global_timeout =
686                Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
687
688            if let Some(factory) = this.context_server_factory.as_ref() {
689                return anyhow::Ok(factory(id.clone(), configuration.clone()));
690            }
691
692            match configuration.as_ref() {
693                ContextServerConfiguration::Http {
694                    url,
695                    headers,
696                    timeout,
697                } => anyhow::Ok(Arc::new(ContextServer::http(
698                    id,
699                    url,
700                    headers.clone(),
701                    cx.http_client(),
702                    cx.background_executor().clone(),
703                    Some(Duration::from_secs(
704                        timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
705                    )),
706                )?)),
707                _ => {
708                    let mut command = configuration
709                        .command()
710                        .context("Missing command configuration for stdio context server")?
711                        .clone();
712                    command.timeout = Some(
713                        command
714                            .timeout
715                            .unwrap_or(global_timeout)
716                            .min(MAX_TIMEOUT_SECS),
717                    );
718
719                    // Don't pass remote paths as working directory for locally-spawned processes
720                    let working_directory = if is_remote_project { None } else { root_path };
721                    anyhow::Ok(Arc::new(ContextServer::stdio(
722                        id,
723                        command,
724                        working_directory,
725                    )))
726                }
727            }
728        })??;
729
730        Ok((server, configuration))
731    }
732
733    async fn handle_get_context_server_command(
734        this: Entity<Self>,
735        envelope: TypedEnvelope<proto::GetContextServerCommand>,
736        mut cx: AsyncApp,
737    ) -> Result<proto::ContextServerCommand> {
738        let server_id = ContextServerId(envelope.payload.server_id.into());
739
740        let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
741            let ContextServerStoreState::Local {
742                is_headless: true, ..
743            } = &this.state
744            else {
745                anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
746            };
747
748            let settings = this
749                .context_server_settings
750                .get(&server_id.0)
751                .cloned()
752                .or_else(|| {
753                    this.registry
754                        .read(inner_cx)
755                        .context_server_descriptor(&server_id.0)
756                        .map(|_| ContextServerSettings::default_extension())
757                })
758                .with_context(|| format!("context server `{}` not found", server_id))?;
759
760            anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
761        })?;
762
763        let configuration = ContextServerConfiguration::from_settings(
764            settings,
765            server_id.clone(),
766            registry,
767            worktree_store,
768            &cx,
769        )
770        .await
771        .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
772
773        let command = configuration
774            .command()
775            .context("context server has no command (HTTP servers don't need RPC)")?;
776
777        Ok(proto::ContextServerCommand {
778            path: command.path.display().to_string(),
779            args: command.args.clone(),
780            env: command
781                .env
782                .clone()
783                .map(|env| env.into_iter().collect())
784                .unwrap_or_default(),
785        })
786    }
787
788    fn resolve_project_settings<'a>(
789        worktree_store: &'a Entity<WorktreeStore>,
790        cx: &'a App,
791    ) -> &'a ProjectSettings {
792        let location = worktree_store
793            .read(cx)
794            .visible_worktrees(cx)
795            .next()
796            .map(|worktree| settings::SettingsLocation {
797                worktree_id: worktree.read(cx).id(),
798                path: RelPath::empty(),
799            });
800        ProjectSettings::get(location, cx)
801    }
802
803    fn update_server_state(
804        &mut self,
805        id: ContextServerId,
806        state: ContextServerState,
807        cx: &mut Context<Self>,
808    ) {
809        let status = ContextServerStatus::from_state(&state);
810        self.servers.insert(id.clone(), state);
811        cx.emit(Event::ServerStatusChanged {
812            server_id: id,
813            status,
814        });
815    }
816
817    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
818        if self.update_servers_task.is_some() {
819            self.needs_server_update = true;
820        } else {
821            self.needs_server_update = false;
822            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
823                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
824                    log::error!("Error maintaining context servers: {}", err);
825                }
826
827                this.update(cx, |this, cx| {
828                    this.update_servers_task.take();
829                    if this.needs_server_update {
830                        this.available_context_servers_changed(cx);
831                    }
832                })?;
833
834                Ok(())
835            }));
836        }
837    }
838
839    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
840        let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
841            (
842                this.context_server_settings.clone(),
843                this.registry.clone(),
844                this.worktree_store.clone(),
845            )
846        })?;
847
848        for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
849            configured_servers
850                .entry(id)
851                .or_insert(ContextServerSettings::default_extension());
852        }
853
854        let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
855            configured_servers
856                .into_iter()
857                .partition(|(_, settings)| settings.enabled());
858
859        let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
860            let id = ContextServerId(id);
861            ContextServerConfiguration::from_settings(
862                settings,
863                id.clone(),
864                registry.clone(),
865                worktree_store.clone(),
866                cx,
867            )
868            .map(move |config| (id, config))
869        }))
870        .await
871        .into_iter()
872        .filter_map(|(id, config)| config.map(|config| (id, config)))
873        .collect::<HashMap<_, _>>();
874
875        let mut servers_to_start = Vec::new();
876        let mut servers_to_remove = HashSet::default();
877        let mut servers_to_stop = HashSet::default();
878
879        this.update(cx, |this, _cx| {
880            for server_id in this.servers.keys() {
881                // All servers that are not in desired_servers should be removed from the store.
882                // This can happen if the user removed a server from the context server settings.
883                if !configured_servers.contains_key(server_id) {
884                    if disabled_servers.contains_key(&server_id.0) {
885                        servers_to_stop.insert(server_id.clone());
886                    } else {
887                        servers_to_remove.insert(server_id.clone());
888                    }
889                }
890            }
891
892            for (id, config) in configured_servers {
893                let state = this.servers.get(&id);
894                let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
895                let existing_config = state.as_ref().map(|state| state.configuration());
896                if existing_config.as_deref() != Some(&config) || is_stopped {
897                    let config = Arc::new(config);
898                    servers_to_start.push((id.clone(), config));
899                    if this.servers.contains_key(&id) {
900                        servers_to_stop.insert(id);
901                    }
902                }
903            }
904
905            anyhow::Ok(())
906        })??;
907
908        this.update(cx, |this, inner_cx| {
909            for id in servers_to_stop {
910                this.stop_server(&id, inner_cx)?;
911            }
912            for id in servers_to_remove {
913                this.remove_server(&id, inner_cx)?;
914            }
915            anyhow::Ok(())
916        })??;
917
918        for (id, config) in servers_to_start {
919            let (server, config) =
920                Self::create_context_server(this.clone(), id, config, cx).await?;
921            this.update(cx, |this, cx| {
922                this.run_server(server, config, cx);
923            })?;
924        }
925
926        Ok(())
927    }
928}