context_server_store.rs

  1pub mod extension;
  2pub mod registry;
  3
  4use std::path::Path;
  5use std::sync::Arc;
  6use std::time::Duration;
  7
  8use anyhow::{Context as _, Result};
  9use collections::{HashMap, HashSet};
 10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
 11use futures::{FutureExt as _, future::join_all};
 12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
 13use itertools::Itertools;
 14use registry::ContextServerDescriptorRegistry;
 15use remote::RemoteClient;
 16use rpc::{AnyProtoClient, TypedEnvelope, proto};
 17use settings::{Settings as _, SettingsStore};
 18use util::{ResultExt as _, rel_path::RelPath};
 19
 20use crate::{
 21    DisableAiSettings, Project,
 22    project_settings::{ContextServerSettings, ProjectSettings},
 23    worktree_store::WorktreeStore,
 24};
 25
 26/// Maximum timeout for context server requests
 27/// Prevents extremely large timeout values from tying up resources indefinitely.
 28const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
 29
 30pub fn init(cx: &mut App) {
 31    extension::init(cx);
 32}
 33
 34actions!(
 35    context_server,
 36    [
 37        /// Restarts the context server.
 38        Restart
 39    ]
 40);
 41
 42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 43pub enum ContextServerStatus {
 44    Starting,
 45    Running,
 46    Stopped,
 47    Error(Arc<str>),
 48}
 49
 50impl ContextServerStatus {
 51    fn from_state(state: &ContextServerState) -> Self {
 52        match state {
 53            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
 54            ContextServerState::Running { .. } => ContextServerStatus::Running,
 55            ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
 56            ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
 57        }
 58    }
 59}
 60
 61enum ContextServerState {
 62    Starting {
 63        server: Arc<ContextServer>,
 64        configuration: Arc<ContextServerConfiguration>,
 65        _task: Task<()>,
 66    },
 67    Running {
 68        server: Arc<ContextServer>,
 69        configuration: Arc<ContextServerConfiguration>,
 70    },
 71    Stopped {
 72        server: Arc<ContextServer>,
 73        configuration: Arc<ContextServerConfiguration>,
 74    },
 75    Error {
 76        server: Arc<ContextServer>,
 77        configuration: Arc<ContextServerConfiguration>,
 78        error: Arc<str>,
 79    },
 80}
 81
 82impl ContextServerState {
 83    pub fn server(&self) -> Arc<ContextServer> {
 84        match self {
 85            ContextServerState::Starting { server, .. } => server.clone(),
 86            ContextServerState::Running { server, .. } => server.clone(),
 87            ContextServerState::Stopped { server, .. } => server.clone(),
 88            ContextServerState::Error { server, .. } => server.clone(),
 89        }
 90    }
 91
 92    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
 93        match self {
 94            ContextServerState::Starting { configuration, .. } => configuration.clone(),
 95            ContextServerState::Running { configuration, .. } => configuration.clone(),
 96            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
 97            ContextServerState::Error { configuration, .. } => configuration.clone(),
 98        }
 99    }
100}
101
102#[derive(Debug, PartialEq, Eq)]
103pub enum ContextServerConfiguration {
104    Custom {
105        command: ContextServerCommand,
106        remote: bool,
107    },
108    Extension {
109        command: ContextServerCommand,
110        settings: serde_json::Value,
111        remote: bool,
112    },
113    Http {
114        url: url::Url,
115        headers: HashMap<String, String>,
116        timeout: Option<u64>,
117    },
118}
119
120impl ContextServerConfiguration {
121    pub fn command(&self) -> Option<&ContextServerCommand> {
122        match self {
123            ContextServerConfiguration::Custom { command, .. } => Some(command),
124            ContextServerConfiguration::Extension { command, .. } => Some(command),
125            ContextServerConfiguration::Http { .. } => None,
126        }
127    }
128
129    pub fn remote(&self) -> bool {
130        match self {
131            ContextServerConfiguration::Custom { remote, .. } => *remote,
132            ContextServerConfiguration::Extension { remote, .. } => *remote,
133            ContextServerConfiguration::Http { .. } => false,
134        }
135    }
136
137    pub async fn from_settings(
138        settings: ContextServerSettings,
139        id: ContextServerId,
140        registry: Entity<ContextServerDescriptorRegistry>,
141        worktree_store: Entity<WorktreeStore>,
142        cx: &AsyncApp,
143    ) -> Option<Self> {
144        match settings {
145            ContextServerSettings::Stdio {
146                enabled: _,
147                command,
148                remote,
149            } => Some(ContextServerConfiguration::Custom { command, remote }),
150            ContextServerSettings::Extension {
151                enabled: _,
152                settings,
153                remote,
154            } => {
155                let descriptor =
156                    cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
157
158                match descriptor.command(worktree_store, cx).await {
159                    Ok(command) => Some(ContextServerConfiguration::Extension {
160                        command,
161                        settings,
162                        remote,
163                    }),
164                    Err(e) => {
165                        log::error!(
166                            "Failed to create context server configuration from settings: {e:#}"
167                        );
168                        None
169                    }
170                }
171            }
172            ContextServerSettings::Http {
173                enabled: _,
174                url,
175                headers: auth,
176                timeout,
177            } => {
178                let url = url::Url::parse(&url).log_err()?;
179                Some(ContextServerConfiguration::Http {
180                    url,
181                    headers: auth,
182                    timeout,
183                })
184            }
185        }
186    }
187}
188
189pub type ContextServerFactory =
190    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
191
192enum ContextServerStoreState {
193    Local {
194        downstream_client: Option<(u64, AnyProtoClient)>,
195        is_headless: bool,
196    },
197    Remote {
198        project_id: u64,
199        upstream_client: Entity<RemoteClient>,
200    },
201}
202
203pub struct ContextServerStore {
204    state: ContextServerStoreState,
205    context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
206    servers: HashMap<ContextServerId, ContextServerState>,
207    server_ids: Vec<ContextServerId>,
208    worktree_store: Entity<WorktreeStore>,
209    project: Option<WeakEntity<Project>>,
210    registry: Entity<ContextServerDescriptorRegistry>,
211    update_servers_task: Option<Task<Result<()>>>,
212    context_server_factory: Option<ContextServerFactory>,
213    needs_server_update: bool,
214    ai_disabled: bool,
215    _subscriptions: Vec<Subscription>,
216}
217
218pub struct ServerStatusChangedEvent {
219    pub server_id: ContextServerId,
220    pub status: ContextServerStatus,
221}
222
223impl EventEmitter<ServerStatusChangedEvent> for ContextServerStore {}
224
225impl ContextServerStore {
226    pub fn local(
227        worktree_store: Entity<WorktreeStore>,
228        weak_project: Option<WeakEntity<Project>>,
229        headless: bool,
230        cx: &mut Context<Self>,
231    ) -> Self {
232        Self::new_internal(
233            !headless,
234            None,
235            ContextServerDescriptorRegistry::default_global(cx),
236            worktree_store,
237            weak_project,
238            ContextServerStoreState::Local {
239                downstream_client: None,
240                is_headless: headless,
241            },
242            cx,
243        )
244    }
245
246    pub fn remote(
247        project_id: u64,
248        upstream_client: Entity<RemoteClient>,
249        worktree_store: Entity<WorktreeStore>,
250        weak_project: Option<WeakEntity<Project>>,
251        cx: &mut Context<Self>,
252    ) -> Self {
253        Self::new_internal(
254            true,
255            None,
256            ContextServerDescriptorRegistry::default_global(cx),
257            worktree_store,
258            weak_project,
259            ContextServerStoreState::Remote {
260                project_id,
261                upstream_client,
262            },
263            cx,
264        )
265    }
266
267    pub fn init_headless(session: &AnyProtoClient) {
268        session.add_entity_request_handler(Self::handle_get_context_server_command);
269    }
270
271    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
272        if let ContextServerStoreState::Local {
273            downstream_client, ..
274        } = &mut self.state
275        {
276            *downstream_client = Some((project_id, client));
277        }
278    }
279
280    pub fn is_remote_project(&self) -> bool {
281        matches!(self.state, ContextServerStoreState::Remote { .. })
282    }
283
284    /// Returns all configured context server ids, excluding the ones that are disabled
285    pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
286        self.context_server_settings
287            .iter()
288            .filter(|(_, settings)| settings.enabled())
289            .map(|(id, _)| ContextServerId(id.clone()))
290            .collect()
291    }
292
293    #[cfg(feature = "test-support")]
294    pub fn test(
295        registry: Entity<ContextServerDescriptorRegistry>,
296        worktree_store: Entity<WorktreeStore>,
297        weak_project: Option<WeakEntity<Project>>,
298        cx: &mut Context<Self>,
299    ) -> Self {
300        Self::new_internal(
301            false,
302            None,
303            registry,
304            worktree_store,
305            weak_project,
306            ContextServerStoreState::Local {
307                downstream_client: None,
308                is_headless: false,
309            },
310            cx,
311        )
312    }
313
314    #[cfg(feature = "test-support")]
315    pub fn test_maintain_server_loop(
316        context_server_factory: Option<ContextServerFactory>,
317        registry: Entity<ContextServerDescriptorRegistry>,
318        worktree_store: Entity<WorktreeStore>,
319        weak_project: Option<WeakEntity<Project>>,
320        cx: &mut Context<Self>,
321    ) -> Self {
322        Self::new_internal(
323            true,
324            context_server_factory,
325            registry,
326            worktree_store,
327            weak_project,
328            ContextServerStoreState::Local {
329                downstream_client: None,
330                is_headless: false,
331            },
332            cx,
333        )
334    }
335
336    #[cfg(feature = "test-support")]
337    pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
338        self.context_server_factory = Some(factory);
339    }
340
341    #[cfg(feature = "test-support")]
342    pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
343        &self.registry
344    }
345
346    #[cfg(feature = "test-support")]
347    pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
348        let configuration = Arc::new(ContextServerConfiguration::Custom {
349            command: ContextServerCommand {
350                path: "test".into(),
351                args: vec![],
352                env: None,
353                timeout: None,
354            },
355            remote: false,
356        });
357        self.run_server(server, configuration, cx);
358    }
359
360    fn new_internal(
361        maintain_server_loop: bool,
362        context_server_factory: Option<ContextServerFactory>,
363        registry: Entity<ContextServerDescriptorRegistry>,
364        worktree_store: Entity<WorktreeStore>,
365        weak_project: Option<WeakEntity<Project>>,
366        state: ContextServerStoreState,
367        cx: &mut Context<Self>,
368    ) -> Self {
369        let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
370            let ai_disabled = DisableAiSettings::get_global(cx).disable_ai;
371            let ai_was_disabled = this.ai_disabled;
372            this.ai_disabled = ai_disabled;
373
374            let settings =
375                &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
376            let settings_changed = &this.context_server_settings != settings;
377
378            if settings_changed {
379                this.context_server_settings = settings.clone();
380            }
381
382            // When AI is disabled, stop all running servers
383            if ai_disabled && maintain_server_loop {
384                let server_ids: Vec<_> = this.servers.keys().cloned().collect();
385                for id in server_ids {
386                    this.stop_server(&id, cx).log_err();
387                }
388                return;
389            }
390
391            // Trigger updates if AI was re-enabled or settings changed
392            if maintain_server_loop && (ai_was_disabled || settings_changed) {
393                this.available_context_servers_changed(cx);
394            }
395        })];
396
397        if maintain_server_loop {
398            subscriptions.push(cx.observe(&registry, |this, _registry, cx| {
399                if !DisableAiSettings::get_global(cx).disable_ai {
400                    this.available_context_servers_changed(cx);
401                }
402            }));
403        }
404
405        let ai_disabled = DisableAiSettings::get_global(cx).disable_ai;
406        let mut this = Self {
407            state,
408            _subscriptions: subscriptions,
409            context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
410                .context_servers
411                .clone(),
412            worktree_store,
413            project: weak_project,
414            registry,
415            needs_server_update: false,
416            ai_disabled,
417            servers: HashMap::default(),
418            server_ids: Default::default(),
419            update_servers_task: None,
420            context_server_factory,
421        };
422        if maintain_server_loop && !DisableAiSettings::get_global(cx).disable_ai {
423            this.available_context_servers_changed(cx);
424        }
425        this
426    }
427
428    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
429        self.servers.get(id).map(|state| state.server())
430    }
431
432    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
433        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
434            Some(server.clone())
435        } else {
436            None
437        }
438    }
439
440    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
441        self.servers.get(id).map(ContextServerStatus::from_state)
442    }
443
444    pub fn configuration_for_server(
445        &self,
446        id: &ContextServerId,
447    ) -> Option<Arc<ContextServerConfiguration>> {
448        self.servers.get(id).map(|state| state.configuration())
449    }
450
451    /// Returns a sorted slice of available unique context server IDs. Within the
452    /// slice, context servers which have `mcp-server-` as a prefix in their ID will
453    /// appear after servers that do not have this prefix in their ID.
454    pub fn server_ids(&self) -> &[ContextServerId] {
455        self.server_ids.as_slice()
456    }
457
458    fn populate_server_ids(&mut self, cx: &App) {
459        self.server_ids = self
460            .servers
461            .keys()
462            .cloned()
463            .chain(
464                self.registry
465                    .read(cx)
466                    .context_server_descriptors()
467                    .into_iter()
468                    .map(|(id, _)| ContextServerId(id)),
469            )
470            .chain(
471                self.context_server_settings
472                    .keys()
473                    .map(|id| ContextServerId(id.clone())),
474            )
475            .unique()
476            .sorted_unstable_by(
477                // Sort context servers: ones without mcp-server- prefix first, then prefixed ones
478                |a, b| {
479                    const MCP_PREFIX: &str = "mcp-server-";
480                    match (a.0.strip_prefix(MCP_PREFIX), b.0.strip_prefix(MCP_PREFIX)) {
481                        // If one has mcp-server- prefix and other doesn't, non-mcp comes first
482                        (Some(_), None) => std::cmp::Ordering::Greater,
483                        (None, Some(_)) => std::cmp::Ordering::Less,
484                        // If both have same prefix status, sort by appropriate key
485                        (Some(a), Some(b)) => a.cmp(b),
486                        (None, None) => a.0.cmp(&b.0),
487                    }
488                },
489            )
490            .collect();
491    }
492
493    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
494        self.servers
495            .values()
496            .filter_map(|state| {
497                if let ContextServerState::Running { server, .. } = state {
498                    Some(server.clone())
499                } else {
500                    None
501                }
502            })
503            .collect()
504    }
505
506    pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
507        cx.spawn(async move |this, cx| {
508            let this = this.upgrade().context("Context server store dropped")?;
509            let settings = this
510                .update(cx, |this, _| {
511                    this.context_server_settings.get(&server.id().0).cloned()
512                })
513                .context("Failed to get context server settings")?;
514
515            if !settings.enabled() {
516                return anyhow::Ok(());
517            }
518
519            let (registry, worktree_store) = this.update(cx, |this, _| {
520                (this.registry.clone(), this.worktree_store.clone())
521            });
522            let configuration = ContextServerConfiguration::from_settings(
523                settings,
524                server.id(),
525                registry,
526                worktree_store,
527                cx,
528            )
529            .await
530            .context("Failed to create context server configuration")?;
531
532            this.update(cx, |this, cx| {
533                this.run_server(server, Arc::new(configuration), cx)
534            });
535            Ok(())
536        })
537        .detach_and_log_err(cx);
538    }
539
540    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
541        if matches!(
542            self.servers.get(id),
543            Some(ContextServerState::Stopped { .. })
544        ) {
545            return Ok(());
546        }
547
548        let state = self
549            .servers
550            .remove(id)
551            .context("Context server not found")?;
552
553        let server = state.server();
554        let configuration = state.configuration();
555        let mut result = Ok(());
556        if let ContextServerState::Running { server, .. } = &state {
557            result = server.stop();
558        }
559        drop(state);
560
561        self.update_server_state(
562            id.clone(),
563            ContextServerState::Stopped {
564                configuration,
565                server,
566            },
567            cx,
568        );
569
570        result
571    }
572
573    fn run_server(
574        &mut self,
575        server: Arc<ContextServer>,
576        configuration: Arc<ContextServerConfiguration>,
577        cx: &mut Context<Self>,
578    ) {
579        let id = server.id();
580        if matches!(
581            self.servers.get(&id),
582            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
583        ) {
584            self.stop_server(&id, cx).log_err();
585        }
586        let task = cx.spawn({
587            let id = server.id();
588            let server = server.clone();
589            let configuration = configuration.clone();
590
591            async move |this, cx| {
592                match server.clone().start(cx).await {
593                    Ok(_) => {
594                        debug_assert!(server.client().is_some());
595
596                        this.update(cx, |this, cx| {
597                            this.update_server_state(
598                                id.clone(),
599                                ContextServerState::Running {
600                                    server,
601                                    configuration,
602                                },
603                                cx,
604                            )
605                        })
606                        .log_err()
607                    }
608                    Err(err) => {
609                        log::error!("{} context server failed to start: {}", id, err);
610                        this.update(cx, |this, cx| {
611                            this.update_server_state(
612                                id.clone(),
613                                ContextServerState::Error {
614                                    configuration,
615                                    server,
616                                    error: err.to_string().into(),
617                                },
618                                cx,
619                            )
620                        })
621                        .log_err()
622                    }
623                };
624            }
625        });
626
627        self.update_server_state(
628            id.clone(),
629            ContextServerState::Starting {
630                configuration,
631                _task: task,
632                server,
633            },
634            cx,
635        );
636    }
637
638    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
639        let state = self
640            .servers
641            .remove(id)
642            .context("Context server not found")?;
643        drop(state);
644        cx.emit(ServerStatusChangedEvent {
645            server_id: id.clone(),
646            status: ContextServerStatus::Stopped,
647        });
648        Ok(())
649    }
650
651    pub async fn create_context_server(
652        this: WeakEntity<Self>,
653        id: ContextServerId,
654        configuration: Arc<ContextServerConfiguration>,
655        cx: &mut AsyncApp,
656    ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
657        let remote = configuration.remote();
658        let needs_remote_command = match configuration.as_ref() {
659            ContextServerConfiguration::Custom { .. }
660            | ContextServerConfiguration::Extension { .. } => remote,
661            ContextServerConfiguration::Http { .. } => false,
662        };
663
664        let (remote_state, is_remote_project) = this.update(cx, |this, _| {
665            let remote_state = match &this.state {
666                ContextServerStoreState::Remote {
667                    project_id,
668                    upstream_client,
669                } if needs_remote_command => Some((*project_id, upstream_client.clone())),
670                _ => None,
671            };
672            (remote_state, this.is_remote_project())
673        })?;
674
675        let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
676            this.project
677                .as_ref()
678                .and_then(|project| {
679                    project
680                        .read_with(cx, |project, cx| project.active_project_directory(cx))
681                        .ok()
682                        .flatten()
683                })
684                .or_else(|| {
685                    this.worktree_store.read_with(cx, |store, cx| {
686                        store.visible_worktrees(cx).fold(None, |acc, item| {
687                            if acc.is_none() {
688                                item.read(cx).root_dir()
689                            } else {
690                                acc
691                            }
692                        })
693                    })
694                })
695        })?;
696
697        let configuration = if let Some((project_id, upstream_client)) = remote_state {
698            let root_dir = root_path.as_ref().map(|p| p.display().to_string());
699
700            let response = upstream_client
701                .update(cx, |client, _| {
702                    client
703                        .proto_client()
704                        .request(proto::GetContextServerCommand {
705                            project_id,
706                            server_id: id.0.to_string(),
707                            root_dir: root_dir.clone(),
708                        })
709                })
710                .await?;
711
712            let remote_command = upstream_client.update(cx, |client, _| {
713                client.build_command(
714                    Some(response.path),
715                    &response.args,
716                    &response.env.into_iter().collect(),
717                    root_dir,
718                    None,
719                )
720            })?;
721
722            let command = ContextServerCommand {
723                path: remote_command.program.into(),
724                args: remote_command.args,
725                env: Some(remote_command.env.into_iter().collect()),
726                timeout: None,
727            };
728
729            Arc::new(ContextServerConfiguration::Custom { command, remote })
730        } else {
731            configuration
732        };
733
734        let server: Arc<ContextServer> = this.update(cx, |this, cx| {
735            let global_timeout =
736                Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
737
738            if let Some(factory) = this.context_server_factory.as_ref() {
739                return anyhow::Ok(factory(id.clone(), configuration.clone()));
740            }
741
742            match configuration.as_ref() {
743                ContextServerConfiguration::Http {
744                    url,
745                    headers,
746                    timeout,
747                } => anyhow::Ok(Arc::new(ContextServer::http(
748                    id,
749                    url,
750                    headers.clone(),
751                    cx.http_client(),
752                    cx.background_executor().clone(),
753                    Some(Duration::from_secs(
754                        timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
755                    )),
756                )?)),
757                _ => {
758                    let mut command = configuration
759                        .command()
760                        .context("Missing command configuration for stdio context server")?
761                        .clone();
762                    command.timeout = Some(
763                        command
764                            .timeout
765                            .unwrap_or(global_timeout)
766                            .min(MAX_TIMEOUT_SECS),
767                    );
768
769                    // Don't pass remote paths as working directory for locally-spawned processes
770                    let working_directory = if is_remote_project { None } else { root_path };
771                    anyhow::Ok(Arc::new(ContextServer::stdio(
772                        id,
773                        command,
774                        working_directory,
775                    )))
776                }
777            }
778        })??;
779
780        Ok((server, configuration))
781    }
782
783    async fn handle_get_context_server_command(
784        this: Entity<Self>,
785        envelope: TypedEnvelope<proto::GetContextServerCommand>,
786        mut cx: AsyncApp,
787    ) -> Result<proto::ContextServerCommand> {
788        let server_id = ContextServerId(envelope.payload.server_id.into());
789
790        let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
791            let ContextServerStoreState::Local {
792                is_headless: true, ..
793            } = &this.state
794            else {
795                anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
796            };
797
798            let settings = this
799                .context_server_settings
800                .get(&server_id.0)
801                .cloned()
802                .or_else(|| {
803                    this.registry
804                        .read(inner_cx)
805                        .context_server_descriptor(&server_id.0)
806                        .map(|_| ContextServerSettings::default_extension())
807                })
808                .with_context(|| format!("context server `{}` not found", server_id))?;
809
810            anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
811        })?;
812
813        let configuration = ContextServerConfiguration::from_settings(
814            settings,
815            server_id.clone(),
816            registry,
817            worktree_store,
818            &cx,
819        )
820        .await
821        .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
822
823        let command = configuration
824            .command()
825            .context("context server has no command (HTTP servers don't need RPC)")?;
826
827        Ok(proto::ContextServerCommand {
828            path: command.path.display().to_string(),
829            args: command.args.clone(),
830            env: command
831                .env
832                .clone()
833                .map(|env| env.into_iter().collect())
834                .unwrap_or_default(),
835        })
836    }
837
838    fn resolve_project_settings<'a>(
839        worktree_store: &'a Entity<WorktreeStore>,
840        cx: &'a App,
841    ) -> &'a ProjectSettings {
842        let location = worktree_store
843            .read(cx)
844            .visible_worktrees(cx)
845            .next()
846            .map(|worktree| settings::SettingsLocation {
847                worktree_id: worktree.read(cx).id(),
848                path: RelPath::empty(),
849            });
850        ProjectSettings::get(location, cx)
851    }
852
853    fn update_server_state(
854        &mut self,
855        id: ContextServerId,
856        state: ContextServerState,
857        cx: &mut Context<Self>,
858    ) {
859        let status = ContextServerStatus::from_state(&state);
860        self.servers.insert(id.clone(), state);
861        cx.emit(ServerStatusChangedEvent {
862            server_id: id,
863            status,
864        });
865    }
866
867    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
868        if self.update_servers_task.is_some() {
869            self.needs_server_update = true;
870        } else {
871            self.needs_server_update = false;
872            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
873                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
874                    log::error!("Error maintaining context servers: {}", err);
875                }
876
877                this.update(cx, |this, cx| {
878                    this.populate_server_ids(cx);
879                    this.update_servers_task.take();
880                    if this.needs_server_update {
881                        this.available_context_servers_changed(cx);
882                    }
883                })?;
884
885                Ok(())
886            }));
887        }
888    }
889
890    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
891        let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
892            (
893                this.context_server_settings.clone(),
894                this.registry.clone(),
895                this.worktree_store.clone(),
896            )
897        })?;
898
899        for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
900            configured_servers
901                .entry(id)
902                .or_insert(ContextServerSettings::default_extension());
903        }
904
905        let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
906            configured_servers
907                .into_iter()
908                .partition(|(_, settings)| settings.enabled());
909
910        let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
911            let id = ContextServerId(id);
912            ContextServerConfiguration::from_settings(
913                settings,
914                id.clone(),
915                registry.clone(),
916                worktree_store.clone(),
917                cx,
918            )
919            .map(move |config| (id, config))
920        }))
921        .await
922        .into_iter()
923        .filter_map(|(id, config)| config.map(|config| (id, config)))
924        .collect::<HashMap<_, _>>();
925
926        let mut servers_to_start = Vec::new();
927        let mut servers_to_remove = HashSet::default();
928        let mut servers_to_stop = HashSet::default();
929
930        this.update(cx, |this, _cx| {
931            for server_id in this.servers.keys() {
932                // All servers that are not in desired_servers should be removed from the store.
933                // This can happen if the user removed a server from the context server settings.
934                if !configured_servers.contains_key(server_id) {
935                    if disabled_servers.contains_key(&server_id.0) {
936                        servers_to_stop.insert(server_id.clone());
937                    } else {
938                        servers_to_remove.insert(server_id.clone());
939                    }
940                }
941            }
942
943            for (id, config) in configured_servers {
944                let state = this.servers.get(&id);
945                let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
946                let existing_config = state.as_ref().map(|state| state.configuration());
947                if existing_config.as_deref() != Some(&config) || is_stopped {
948                    let config = Arc::new(config);
949                    servers_to_start.push((id.clone(), config));
950                    if this.servers.contains_key(&id) {
951                        servers_to_stop.insert(id);
952                    }
953                }
954            }
955
956            anyhow::Ok(())
957        })??;
958
959        this.update(cx, |this, inner_cx| {
960            for id in servers_to_stop {
961                this.stop_server(&id, inner_cx)?;
962            }
963            for id in servers_to_remove {
964                this.remove_server(&id, inner_cx)?;
965            }
966            anyhow::Ok(())
967        })??;
968
969        for (id, config) in servers_to_start {
970            let (server, config) =
971                Self::create_context_server(this.clone(), id, config, cx).await?;
972            this.update(cx, |this, cx| {
973                this.run_server(server, config, cx);
974            })?;
975        }
976
977        Ok(())
978    }
979}