context_server_store.rs

  1pub mod extension;
  2pub mod registry;
  3
  4use std::path::Path;
  5use std::sync::Arc;
  6use std::time::Duration;
  7
  8use anyhow::{Context as _, Result};
  9use collections::{HashMap, HashSet};
 10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
 11use futures::{FutureExt as _, future::join_all};
 12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
 13use itertools::Itertools;
 14use registry::ContextServerDescriptorRegistry;
 15use remote::RemoteClient;
 16use rpc::{AnyProtoClient, TypedEnvelope, proto};
 17use settings::{Settings as _, SettingsStore};
 18use util::{ResultExt as _, rel_path::RelPath};
 19
 20use crate::{
 21    DisableAiSettings, Project,
 22    project_settings::{ContextServerSettings, ProjectSettings},
 23    worktree_store::WorktreeStore,
 24};
 25
 26/// Maximum timeout for context server requests
 27/// Prevents extremely large timeout values from tying up resources indefinitely.
 28const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
 29
 30pub fn init(cx: &mut App) {
 31    extension::init(cx);
 32}
 33
 34actions!(
 35    context_server,
 36    [
 37        /// Restarts the context server.
 38        Restart
 39    ]
 40);
 41
 42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
 43pub enum ContextServerStatus {
 44    Starting,
 45    Running,
 46    Stopped,
 47    Error(Arc<str>),
 48}
 49
 50impl ContextServerStatus {
 51    fn from_state(state: &ContextServerState) -> Self {
 52        match state {
 53            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
 54            ContextServerState::Running { .. } => ContextServerStatus::Running,
 55            ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
 56            ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
 57        }
 58    }
 59}
 60
 61enum ContextServerState {
 62    Starting {
 63        server: Arc<ContextServer>,
 64        configuration: Arc<ContextServerConfiguration>,
 65        _task: Task<()>,
 66    },
 67    Running {
 68        server: Arc<ContextServer>,
 69        configuration: Arc<ContextServerConfiguration>,
 70    },
 71    Stopped {
 72        server: Arc<ContextServer>,
 73        configuration: Arc<ContextServerConfiguration>,
 74    },
 75    Error {
 76        server: Arc<ContextServer>,
 77        configuration: Arc<ContextServerConfiguration>,
 78        error: Arc<str>,
 79    },
 80}
 81
 82impl ContextServerState {
 83    pub fn server(&self) -> Arc<ContextServer> {
 84        match self {
 85            ContextServerState::Starting { server, .. } => server.clone(),
 86            ContextServerState::Running { server, .. } => server.clone(),
 87            ContextServerState::Stopped { server, .. } => server.clone(),
 88            ContextServerState::Error { server, .. } => server.clone(),
 89        }
 90    }
 91
 92    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
 93        match self {
 94            ContextServerState::Starting { configuration, .. } => configuration.clone(),
 95            ContextServerState::Running { configuration, .. } => configuration.clone(),
 96            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
 97            ContextServerState::Error { configuration, .. } => configuration.clone(),
 98        }
 99    }
100}
101
102#[derive(Debug, PartialEq, Eq)]
103pub enum ContextServerConfiguration {
104    Custom {
105        command: ContextServerCommand,
106        remote: bool,
107    },
108    Extension {
109        command: ContextServerCommand,
110        settings: serde_json::Value,
111        remote: bool,
112    },
113    Http {
114        url: url::Url,
115        headers: HashMap<String, String>,
116        timeout: Option<u64>,
117    },
118}
119
120impl ContextServerConfiguration {
121    pub fn command(&self) -> Option<&ContextServerCommand> {
122        match self {
123            ContextServerConfiguration::Custom { command, .. } => Some(command),
124            ContextServerConfiguration::Extension { command, .. } => Some(command),
125            ContextServerConfiguration::Http { .. } => None,
126        }
127    }
128
129    pub fn remote(&self) -> bool {
130        match self {
131            ContextServerConfiguration::Custom { remote, .. } => *remote,
132            ContextServerConfiguration::Extension { remote, .. } => *remote,
133            ContextServerConfiguration::Http { .. } => false,
134        }
135    }
136
137    pub async fn from_settings(
138        settings: ContextServerSettings,
139        id: ContextServerId,
140        registry: Entity<ContextServerDescriptorRegistry>,
141        worktree_store: Entity<WorktreeStore>,
142        cx: &AsyncApp,
143    ) -> Option<Self> {
144        match settings {
145            ContextServerSettings::Stdio {
146                enabled: _,
147                command,
148                remote,
149            } => Some(ContextServerConfiguration::Custom { command, remote }),
150            ContextServerSettings::Extension {
151                enabled: _,
152                settings,
153                remote,
154            } => {
155                let descriptor =
156                    cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
157
158                match descriptor.command(worktree_store, cx).await {
159                    Ok(command) => Some(ContextServerConfiguration::Extension {
160                        command,
161                        settings,
162                        remote,
163                    }),
164                    Err(e) => {
165                        log::error!(
166                            "Failed to create context server configuration from settings: {e:#}"
167                        );
168                        None
169                    }
170                }
171            }
172            ContextServerSettings::Http {
173                enabled: _,
174                url,
175                headers: auth,
176                timeout,
177            } => {
178                let url = url::Url::parse(&url).log_err()?;
179                Some(ContextServerConfiguration::Http {
180                    url,
181                    headers: auth,
182                    timeout,
183                })
184            }
185        }
186    }
187}
188
189pub type ContextServerFactory =
190    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
191
192enum ContextServerStoreState {
193    Local {
194        downstream_client: Option<(u64, AnyProtoClient)>,
195        is_headless: bool,
196    },
197    Remote {
198        project_id: u64,
199        upstream_client: Entity<RemoteClient>,
200    },
201}
202
203pub struct ContextServerStore {
204    state: ContextServerStoreState,
205    context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
206    servers: HashMap<ContextServerId, ContextServerState>,
207    server_ids: Vec<ContextServerId>,
208    worktree_store: Entity<WorktreeStore>,
209    project: Option<WeakEntity<Project>>,
210    registry: Entity<ContextServerDescriptorRegistry>,
211    update_servers_task: Option<Task<Result<()>>>,
212    context_server_factory: Option<ContextServerFactory>,
213    needs_server_update: bool,
214    _subscriptions: Vec<Subscription>,
215}
216
217pub struct ServerStatusChangedEvent {
218    pub server_id: ContextServerId,
219    pub status: ContextServerStatus,
220}
221
222impl EventEmitter<ServerStatusChangedEvent> for ContextServerStore {}
223
224impl ContextServerStore {
225    pub fn local(
226        worktree_store: Entity<WorktreeStore>,
227        weak_project: Option<WeakEntity<Project>>,
228        headless: bool,
229        cx: &mut Context<Self>,
230    ) -> Self {
231        Self::new_internal(
232            !headless,
233            None,
234            ContextServerDescriptorRegistry::default_global(cx),
235            worktree_store,
236            weak_project,
237            ContextServerStoreState::Local {
238                downstream_client: None,
239                is_headless: headless,
240            },
241            cx,
242        )
243    }
244
245    pub fn remote(
246        project_id: u64,
247        upstream_client: Entity<RemoteClient>,
248        worktree_store: Entity<WorktreeStore>,
249        weak_project: Option<WeakEntity<Project>>,
250        cx: &mut Context<Self>,
251    ) -> Self {
252        Self::new_internal(
253            true,
254            None,
255            ContextServerDescriptorRegistry::default_global(cx),
256            worktree_store,
257            weak_project,
258            ContextServerStoreState::Remote {
259                project_id,
260                upstream_client,
261            },
262            cx,
263        )
264    }
265
266    pub fn init_headless(session: &AnyProtoClient) {
267        session.add_entity_request_handler(Self::handle_get_context_server_command);
268    }
269
270    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
271        if let ContextServerStoreState::Local {
272            downstream_client, ..
273        } = &mut self.state
274        {
275            *downstream_client = Some((project_id, client));
276        }
277    }
278
279    pub fn is_remote_project(&self) -> bool {
280        matches!(self.state, ContextServerStoreState::Remote { .. })
281    }
282
283    /// Returns all configured context server ids, excluding the ones that are disabled
284    pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
285        self.context_server_settings
286            .iter()
287            .filter(|(_, settings)| settings.enabled())
288            .map(|(id, _)| ContextServerId(id.clone()))
289            .collect()
290    }
291
292    #[cfg(feature = "test-support")]
293    pub fn test(
294        registry: Entity<ContextServerDescriptorRegistry>,
295        worktree_store: Entity<WorktreeStore>,
296        weak_project: Option<WeakEntity<Project>>,
297        cx: &mut Context<Self>,
298    ) -> Self {
299        Self::new_internal(
300            false,
301            None,
302            registry,
303            worktree_store,
304            weak_project,
305            ContextServerStoreState::Local {
306                downstream_client: None,
307                is_headless: false,
308            },
309            cx,
310        )
311    }
312
313    #[cfg(feature = "test-support")]
314    pub fn test_maintain_server_loop(
315        context_server_factory: Option<ContextServerFactory>,
316        registry: Entity<ContextServerDescriptorRegistry>,
317        worktree_store: Entity<WorktreeStore>,
318        weak_project: Option<WeakEntity<Project>>,
319        cx: &mut Context<Self>,
320    ) -> Self {
321        Self::new_internal(
322            true,
323            context_server_factory,
324            registry,
325            worktree_store,
326            weak_project,
327            ContextServerStoreState::Local {
328                downstream_client: None,
329                is_headless: false,
330            },
331            cx,
332        )
333    }
334
335    #[cfg(feature = "test-support")]
336    pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
337        self.context_server_factory = Some(factory);
338    }
339
340    #[cfg(feature = "test-support")]
341    pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
342        &self.registry
343    }
344
345    #[cfg(feature = "test-support")]
346    pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
347        let configuration = Arc::new(ContextServerConfiguration::Custom {
348            command: ContextServerCommand {
349                path: "test".into(),
350                args: vec![],
351                env: None,
352                timeout: None,
353            },
354            remote: false,
355        });
356        self.run_server(server, configuration, cx);
357    }
358
359    fn new_internal(
360        maintain_server_loop: bool,
361        context_server_factory: Option<ContextServerFactory>,
362        registry: Entity<ContextServerDescriptorRegistry>,
363        worktree_store: Entity<WorktreeStore>,
364        weak_project: Option<WeakEntity<Project>>,
365        state: ContextServerStoreState,
366        cx: &mut Context<Self>,
367    ) -> Self {
368        let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
369            let settings =
370                &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
371            if &this.context_server_settings == settings {
372                return;
373            }
374            this.context_server_settings = settings.clone();
375            if maintain_server_loop {
376                this.available_context_servers_changed(cx);
377            }
378        })];
379
380        if maintain_server_loop {
381            subscriptions.push(cx.observe(&registry, |this, _registry, cx| {
382                this.available_context_servers_changed(cx);
383            }));
384        }
385
386        let mut this = Self {
387            state,
388            _subscriptions: subscriptions,
389            context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
390                .context_servers
391                .clone(),
392            worktree_store,
393            project: weak_project,
394            registry,
395            needs_server_update: false,
396            servers: HashMap::default(),
397            server_ids: Default::default(),
398            update_servers_task: None,
399            context_server_factory,
400        };
401        if maintain_server_loop {
402            this.available_context_servers_changed(cx);
403        }
404        this
405    }
406
407    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
408        self.servers.get(id).map(|state| state.server())
409    }
410
411    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
412        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
413            Some(server.clone())
414        } else {
415            None
416        }
417    }
418
419    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
420        self.servers.get(id).map(ContextServerStatus::from_state)
421    }
422
423    pub fn configuration_for_server(
424        &self,
425        id: &ContextServerId,
426    ) -> Option<Arc<ContextServerConfiguration>> {
427        self.servers.get(id).map(|state| state.configuration())
428    }
429
430    /// Returns a sorted slice of available unique context server IDs. Within the
431    /// slice, context servers which have `mcp-server-` as a prefix in their ID will
432    /// appear after servers that do not have this prefix in their ID.
433    pub fn server_ids(&self) -> &[ContextServerId] {
434        self.server_ids.as_slice()
435    }
436
437    fn populate_server_ids(&mut self, cx: &App) {
438        self.server_ids = self
439            .servers
440            .keys()
441            .cloned()
442            .chain(
443                self.registry
444                    .read(cx)
445                    .context_server_descriptors()
446                    .into_iter()
447                    .map(|(id, _)| ContextServerId(id)),
448            )
449            .chain(
450                self.context_server_settings
451                    .keys()
452                    .map(|id| ContextServerId(id.clone())),
453            )
454            .unique()
455            .sorted_unstable_by(
456                // Sort context servers: ones without mcp-server- prefix first, then prefixed ones
457                |a, b| {
458                    const MCP_PREFIX: &str = "mcp-server-";
459                    match (a.0.strip_prefix(MCP_PREFIX), b.0.strip_prefix(MCP_PREFIX)) {
460                        // If one has mcp-server- prefix and other doesn't, non-mcp comes first
461                        (Some(_), None) => std::cmp::Ordering::Greater,
462                        (None, Some(_)) => std::cmp::Ordering::Less,
463                        // If both have same prefix status, sort by appropriate key
464                        (Some(a), Some(b)) => a.cmp(b),
465                        (None, None) => a.0.cmp(&b.0),
466                    }
467                },
468            )
469            .collect();
470    }
471
472    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
473        self.servers
474            .values()
475            .filter_map(|state| {
476                if let ContextServerState::Running { server, .. } = state {
477                    Some(server.clone())
478                } else {
479                    None
480                }
481            })
482            .collect()
483    }
484
485    pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
486        cx.spawn(async move |this, cx| {
487            let this = this.upgrade().context("Context server store dropped")?;
488            let settings = this
489                .update(cx, |this, _| {
490                    this.context_server_settings.get(&server.id().0).cloned()
491                })
492                .context("Failed to get context server settings")?;
493
494            if !settings.enabled() {
495                return anyhow::Ok(());
496            }
497
498            let (registry, worktree_store) = this.update(cx, |this, _| {
499                (this.registry.clone(), this.worktree_store.clone())
500            });
501            let configuration = ContextServerConfiguration::from_settings(
502                settings,
503                server.id(),
504                registry,
505                worktree_store,
506                cx,
507            )
508            .await
509            .context("Failed to create context server configuration")?;
510
511            this.update(cx, |this, cx| {
512                this.run_server(server, Arc::new(configuration), cx)
513            });
514            Ok(())
515        })
516        .detach_and_log_err(cx);
517    }
518
519    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
520        if matches!(
521            self.servers.get(id),
522            Some(ContextServerState::Stopped { .. })
523        ) {
524            return Ok(());
525        }
526
527        let state = self
528            .servers
529            .remove(id)
530            .context("Context server not found")?;
531
532        let server = state.server();
533        let configuration = state.configuration();
534        let mut result = Ok(());
535        if let ContextServerState::Running { server, .. } = &state {
536            result = server.stop();
537        }
538        drop(state);
539
540        self.update_server_state(
541            id.clone(),
542            ContextServerState::Stopped {
543                configuration,
544                server,
545            },
546            cx,
547        );
548
549        result
550    }
551
552    fn run_server(
553        &mut self,
554        server: Arc<ContextServer>,
555        configuration: Arc<ContextServerConfiguration>,
556        cx: &mut Context<Self>,
557    ) {
558        let id = server.id();
559        if matches!(
560            self.servers.get(&id),
561            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
562        ) {
563            self.stop_server(&id, cx).log_err();
564        }
565        let task = cx.spawn({
566            let id = server.id();
567            let server = server.clone();
568            let configuration = configuration.clone();
569
570            async move |this, cx| {
571                match server.clone().start(cx).await {
572                    Ok(_) => {
573                        debug_assert!(server.client().is_some());
574
575                        this.update(cx, |this, cx| {
576                            this.update_server_state(
577                                id.clone(),
578                                ContextServerState::Running {
579                                    server,
580                                    configuration,
581                                },
582                                cx,
583                            )
584                        })
585                        .log_err()
586                    }
587                    Err(err) => {
588                        log::error!("{} context server failed to start: {}", id, err);
589                        this.update(cx, |this, cx| {
590                            this.update_server_state(
591                                id.clone(),
592                                ContextServerState::Error {
593                                    configuration,
594                                    server,
595                                    error: err.to_string().into(),
596                                },
597                                cx,
598                            )
599                        })
600                        .log_err()
601                    }
602                };
603            }
604        });
605
606        self.update_server_state(
607            id.clone(),
608            ContextServerState::Starting {
609                configuration,
610                _task: task,
611                server,
612            },
613            cx,
614        );
615    }
616
617    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
618        let state = self
619            .servers
620            .remove(id)
621            .context("Context server not found")?;
622        drop(state);
623        cx.emit(ServerStatusChangedEvent {
624            server_id: id.clone(),
625            status: ContextServerStatus::Stopped,
626        });
627        Ok(())
628    }
629
630    pub async fn create_context_server(
631        this: WeakEntity<Self>,
632        id: ContextServerId,
633        configuration: Arc<ContextServerConfiguration>,
634        cx: &mut AsyncApp,
635    ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
636        let remote = configuration.remote();
637        let needs_remote_command = match configuration.as_ref() {
638            ContextServerConfiguration::Custom { .. }
639            | ContextServerConfiguration::Extension { .. } => remote,
640            ContextServerConfiguration::Http { .. } => false,
641        };
642
643        let (remote_state, is_remote_project) = this.update(cx, |this, _| {
644            let remote_state = match &this.state {
645                ContextServerStoreState::Remote {
646                    project_id,
647                    upstream_client,
648                } if needs_remote_command => Some((*project_id, upstream_client.clone())),
649                _ => None,
650            };
651            (remote_state, this.is_remote_project())
652        })?;
653
654        let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
655            this.project
656                .as_ref()
657                .and_then(|project| {
658                    project
659                        .read_with(cx, |project, cx| project.active_project_directory(cx))
660                        .ok()
661                        .flatten()
662                })
663                .or_else(|| {
664                    this.worktree_store.read_with(cx, |store, cx| {
665                        store.visible_worktrees(cx).fold(None, |acc, item| {
666                            if acc.is_none() {
667                                item.read(cx).root_dir()
668                            } else {
669                                acc
670                            }
671                        })
672                    })
673                })
674        })?;
675
676        let configuration = if let Some((project_id, upstream_client)) = remote_state {
677            let root_dir = root_path.as_ref().map(|p| p.display().to_string());
678
679            let response = upstream_client
680                .update(cx, |client, _| {
681                    client
682                        .proto_client()
683                        .request(proto::GetContextServerCommand {
684                            project_id,
685                            server_id: id.0.to_string(),
686                            root_dir: root_dir.clone(),
687                        })
688                })
689                .await?;
690
691            let remote_command = upstream_client.update(cx, |client, _| {
692                client.build_command(
693                    Some(response.path),
694                    &response.args,
695                    &response.env.into_iter().collect(),
696                    root_dir,
697                    None,
698                )
699            })?;
700
701            let command = ContextServerCommand {
702                path: remote_command.program.into(),
703                args: remote_command.args,
704                env: Some(remote_command.env.into_iter().collect()),
705                timeout: None,
706            };
707
708            Arc::new(ContextServerConfiguration::Custom { command, remote })
709        } else {
710            configuration
711        };
712
713        let server: Arc<ContextServer> = this.update(cx, |this, cx| {
714            let global_timeout =
715                Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
716
717            if let Some(factory) = this.context_server_factory.as_ref() {
718                return anyhow::Ok(factory(id.clone(), configuration.clone()));
719            }
720
721            match configuration.as_ref() {
722                ContextServerConfiguration::Http {
723                    url,
724                    headers,
725                    timeout,
726                } => anyhow::Ok(Arc::new(ContextServer::http(
727                    id,
728                    url,
729                    headers.clone(),
730                    cx.http_client(),
731                    cx.background_executor().clone(),
732                    Some(Duration::from_secs(
733                        timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
734                    )),
735                )?)),
736                _ => {
737                    let mut command = configuration
738                        .command()
739                        .context("Missing command configuration for stdio context server")?
740                        .clone();
741                    command.timeout = Some(
742                        command
743                            .timeout
744                            .unwrap_or(global_timeout)
745                            .min(MAX_TIMEOUT_SECS),
746                    );
747
748                    // Don't pass remote paths as working directory for locally-spawned processes
749                    let working_directory = if is_remote_project { None } else { root_path };
750                    anyhow::Ok(Arc::new(ContextServer::stdio(
751                        id,
752                        command,
753                        working_directory,
754                    )))
755                }
756            }
757        })??;
758
759        Ok((server, configuration))
760    }
761
762    async fn handle_get_context_server_command(
763        this: Entity<Self>,
764        envelope: TypedEnvelope<proto::GetContextServerCommand>,
765        mut cx: AsyncApp,
766    ) -> Result<proto::ContextServerCommand> {
767        let server_id = ContextServerId(envelope.payload.server_id.into());
768
769        let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
770            let ContextServerStoreState::Local {
771                is_headless: true, ..
772            } = &this.state
773            else {
774                anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
775            };
776
777            let settings = this
778                .context_server_settings
779                .get(&server_id.0)
780                .cloned()
781                .or_else(|| {
782                    this.registry
783                        .read(inner_cx)
784                        .context_server_descriptor(&server_id.0)
785                        .map(|_| ContextServerSettings::default_extension())
786                })
787                .with_context(|| format!("context server `{}` not found", server_id))?;
788
789            anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
790        })?;
791
792        let configuration = ContextServerConfiguration::from_settings(
793            settings,
794            server_id.clone(),
795            registry,
796            worktree_store,
797            &cx,
798        )
799        .await
800        .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
801
802        let command = configuration
803            .command()
804            .context("context server has no command (HTTP servers don't need RPC)")?;
805
806        Ok(proto::ContextServerCommand {
807            path: command.path.display().to_string(),
808            args: command.args.clone(),
809            env: command
810                .env
811                .clone()
812                .map(|env| env.into_iter().collect())
813                .unwrap_or_default(),
814        })
815    }
816
817    fn resolve_project_settings<'a>(
818        worktree_store: &'a Entity<WorktreeStore>,
819        cx: &'a App,
820    ) -> &'a ProjectSettings {
821        let location = worktree_store
822            .read(cx)
823            .visible_worktrees(cx)
824            .next()
825            .map(|worktree| settings::SettingsLocation {
826                worktree_id: worktree.read(cx).id(),
827                path: RelPath::empty(),
828            });
829        ProjectSettings::get(location, cx)
830    }
831
832    fn update_server_state(
833        &mut self,
834        id: ContextServerId,
835        state: ContextServerState,
836        cx: &mut Context<Self>,
837    ) {
838        let status = ContextServerStatus::from_state(&state);
839        self.servers.insert(id.clone(), state);
840        cx.emit(ServerStatusChangedEvent {
841            server_id: id,
842            status,
843        });
844    }
845
846    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
847        if self.update_servers_task.is_some() {
848            self.needs_server_update = true;
849        } else {
850            self.needs_server_update = false;
851            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
852                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
853                    log::error!("Error maintaining context servers: {}", err);
854                }
855
856                this.update(cx, |this, cx| {
857                    this.populate_server_ids(cx);
858                    this.update_servers_task.take();
859                    if this.needs_server_update {
860                        this.available_context_servers_changed(cx);
861                    }
862                })?;
863
864                Ok(())
865            }));
866        }
867    }
868
869    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
870        // Don't start context servers if AI is disabled
871        let ai_disabled = this.update(cx, |_, cx| DisableAiSettings::get_global(cx).disable_ai)?;
872        if ai_disabled {
873            // Stop all running servers when AI is disabled
874            this.update(cx, |this, cx| {
875                let server_ids: Vec<_> = this.servers.keys().cloned().collect();
876                for id in server_ids {
877                    let _ = this.stop_server(&id, cx);
878                }
879            })?;
880            return Ok(());
881        }
882
883        let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
884            (
885                this.context_server_settings.clone(),
886                this.registry.clone(),
887                this.worktree_store.clone(),
888            )
889        })?;
890
891        for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
892            configured_servers
893                .entry(id)
894                .or_insert(ContextServerSettings::default_extension());
895        }
896
897        let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
898            configured_servers
899                .into_iter()
900                .partition(|(_, settings)| settings.enabled());
901
902        let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
903            let id = ContextServerId(id);
904            ContextServerConfiguration::from_settings(
905                settings,
906                id.clone(),
907                registry.clone(),
908                worktree_store.clone(),
909                cx,
910            )
911            .map(move |config| (id, config))
912        }))
913        .await
914        .into_iter()
915        .filter_map(|(id, config)| config.map(|config| (id, config)))
916        .collect::<HashMap<_, _>>();
917
918        let mut servers_to_start = Vec::new();
919        let mut servers_to_remove = HashSet::default();
920        let mut servers_to_stop = HashSet::default();
921
922        this.update(cx, |this, _cx| {
923            for server_id in this.servers.keys() {
924                // All servers that are not in desired_servers should be removed from the store.
925                // This can happen if the user removed a server from the context server settings.
926                if !configured_servers.contains_key(server_id) {
927                    if disabled_servers.contains_key(&server_id.0) {
928                        servers_to_stop.insert(server_id.clone());
929                    } else {
930                        servers_to_remove.insert(server_id.clone());
931                    }
932                }
933            }
934
935            for (id, config) in configured_servers {
936                let state = this.servers.get(&id);
937                let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
938                let existing_config = state.as_ref().map(|state| state.configuration());
939                if existing_config.as_deref() != Some(&config) || is_stopped {
940                    let config = Arc::new(config);
941                    servers_to_start.push((id.clone(), config));
942                    if this.servers.contains_key(&id) {
943                        servers_to_stop.insert(id);
944                    }
945                }
946            }
947
948            anyhow::Ok(())
949        })??;
950
951        this.update(cx, |this, inner_cx| {
952            for id in servers_to_stop {
953                this.stop_server(&id, inner_cx)?;
954            }
955            for id in servers_to_remove {
956                this.remove_server(&id, inner_cx)?;
957            }
958            anyhow::Ok(())
959        })??;
960
961        for (id, config) in servers_to_start {
962            let (server, config) =
963                Self::create_context_server(this.clone(), id, config, cx).await?;
964            this.update(cx, |this, cx| {
965                this.run_server(server, config, cx);
966            })?;
967        }
968
969        Ok(())
970    }
971}