context_server_store.rs

   1pub mod extension;
   2pub mod registry;
   3
   4use std::path::Path;
   5use std::sync::Arc;
   6use std::time::Duration;
   7
   8use anyhow::{Context as _, Result};
   9use collections::{HashMap, HashSet};
  10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  11use futures::{FutureExt as _, future::Either, future::join_all};
  12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
  13use itertools::Itertools;
  14use registry::ContextServerDescriptorRegistry;
  15use remote::RemoteClient;
  16use rpc::{AnyProtoClient, TypedEnvelope, proto};
  17use settings::{Settings as _, SettingsStore};
  18use util::{ResultExt as _, rel_path::RelPath};
  19
  20use crate::{
  21    DisableAiSettings, Project,
  22    project_settings::{ContextServerSettings, ProjectSettings},
  23    worktree_store::WorktreeStore,
  24};
  25
  26/// Maximum timeout for context server requests
  27/// Prevents extremely large timeout values from tying up resources indefinitely.
  28const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
  29
  30pub fn init(cx: &mut App) {
  31    extension::init(cx);
  32}
  33
  34actions!(
  35    context_server,
  36    [
  37        /// Restarts the context server.
  38        Restart
  39    ]
  40);
  41
  42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
  43pub enum ContextServerStatus {
  44    Starting,
  45    Running,
  46    Stopped,
  47    Error(Arc<str>),
  48}
  49
  50impl ContextServerStatus {
  51    fn from_state(state: &ContextServerState) -> Self {
  52        match state {
  53            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
  54            ContextServerState::Running { .. } => ContextServerStatus::Running,
  55            ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
  56            ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
  57        }
  58    }
  59}
  60
  61enum ContextServerState {
  62    Starting {
  63        server: Arc<ContextServer>,
  64        configuration: Arc<ContextServerConfiguration>,
  65        _task: Task<()>,
  66    },
  67    Running {
  68        server: Arc<ContextServer>,
  69        configuration: Arc<ContextServerConfiguration>,
  70    },
  71    Stopped {
  72        server: Arc<ContextServer>,
  73        configuration: Arc<ContextServerConfiguration>,
  74    },
  75    Error {
  76        server: Arc<ContextServer>,
  77        configuration: Arc<ContextServerConfiguration>,
  78        error: Arc<str>,
  79    },
  80}
  81
  82impl ContextServerState {
  83    pub fn server(&self) -> Arc<ContextServer> {
  84        match self {
  85            ContextServerState::Starting { server, .. } => server.clone(),
  86            ContextServerState::Running { server, .. } => server.clone(),
  87            ContextServerState::Stopped { server, .. } => server.clone(),
  88            ContextServerState::Error { server, .. } => server.clone(),
  89        }
  90    }
  91
  92    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
  93        match self {
  94            ContextServerState::Starting { configuration, .. } => configuration.clone(),
  95            ContextServerState::Running { configuration, .. } => configuration.clone(),
  96            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
  97            ContextServerState::Error { configuration, .. } => configuration.clone(),
  98        }
  99    }
 100}
 101
 102#[derive(Debug, PartialEq, Eq)]
 103pub enum ContextServerConfiguration {
 104    Custom {
 105        command: ContextServerCommand,
 106        remote: bool,
 107    },
 108    Extension {
 109        command: ContextServerCommand,
 110        settings: serde_json::Value,
 111        remote: bool,
 112    },
 113    Http {
 114        url: url::Url,
 115        headers: HashMap<String, String>,
 116        timeout: Option<u64>,
 117    },
 118}
 119
 120impl ContextServerConfiguration {
 121    pub fn command(&self) -> Option<&ContextServerCommand> {
 122        match self {
 123            ContextServerConfiguration::Custom { command, .. } => Some(command),
 124            ContextServerConfiguration::Extension { command, .. } => Some(command),
 125            ContextServerConfiguration::Http { .. } => None,
 126        }
 127    }
 128
 129    pub fn remote(&self) -> bool {
 130        match self {
 131            ContextServerConfiguration::Custom { remote, .. } => *remote,
 132            ContextServerConfiguration::Extension { remote, .. } => *remote,
 133            ContextServerConfiguration::Http { .. } => false,
 134        }
 135    }
 136
 137    pub async fn from_settings(
 138        settings: ContextServerSettings,
 139        id: ContextServerId,
 140        registry: Entity<ContextServerDescriptorRegistry>,
 141        worktree_store: Entity<WorktreeStore>,
 142        cx: &AsyncApp,
 143    ) -> Option<Self> {
 144        const EXTENSION_COMMAND_TIMEOUT: Duration = Duration::from_secs(30);
 145
 146        match settings {
 147            ContextServerSettings::Stdio {
 148                enabled: _,
 149                command,
 150                remote,
 151            } => Some(ContextServerConfiguration::Custom { command, remote }),
 152            ContextServerSettings::Extension {
 153                enabled: _,
 154                settings,
 155                remote,
 156            } => {
 157                let descriptor =
 158                    cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
 159
 160                let command_future = descriptor.command(worktree_store, cx);
 161                let timeout_future = cx.background_executor().timer(EXTENSION_COMMAND_TIMEOUT);
 162
 163                match futures::future::select(command_future, timeout_future).await {
 164                    Either::Left((Ok(command), _)) => Some(ContextServerConfiguration::Extension {
 165                        command,
 166                        settings,
 167                        remote,
 168                    }),
 169                    Either::Left((Err(e), _)) => {
 170                        log::error!(
 171                            "Failed to create context server configuration from settings: {e:#}"
 172                        );
 173                        None
 174                    }
 175                    Either::Right(_) => {
 176                        log::error!(
 177                            "Timed out resolving command for extension context server {id}"
 178                        );
 179                        None
 180                    }
 181                }
 182            }
 183            ContextServerSettings::Http {
 184                enabled: _,
 185                url,
 186                headers: auth,
 187                timeout,
 188            } => {
 189                let url = url::Url::parse(&url).log_err()?;
 190                Some(ContextServerConfiguration::Http {
 191                    url,
 192                    headers: auth,
 193                    timeout,
 194                })
 195            }
 196        }
 197    }
 198}
 199
 200pub type ContextServerFactory =
 201    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
 202
 203enum ContextServerStoreState {
 204    Local {
 205        downstream_client: Option<(u64, AnyProtoClient)>,
 206        is_headless: bool,
 207    },
 208    Remote {
 209        project_id: u64,
 210        upstream_client: Entity<RemoteClient>,
 211    },
 212}
 213
 214pub struct ContextServerStore {
 215    state: ContextServerStoreState,
 216    context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
 217    servers: HashMap<ContextServerId, ContextServerState>,
 218    server_ids: Vec<ContextServerId>,
 219    worktree_store: Entity<WorktreeStore>,
 220    project: Option<WeakEntity<Project>>,
 221    registry: Entity<ContextServerDescriptorRegistry>,
 222    update_servers_task: Option<Task<Result<()>>>,
 223    context_server_factory: Option<ContextServerFactory>,
 224    needs_server_update: bool,
 225    ai_disabled: bool,
 226    _subscriptions: Vec<Subscription>,
 227}
 228
 229pub struct ServerStatusChangedEvent {
 230    pub server_id: ContextServerId,
 231    pub status: ContextServerStatus,
 232}
 233
 234impl EventEmitter<ServerStatusChangedEvent> for ContextServerStore {}
 235
 236impl ContextServerStore {
 237    pub fn local(
 238        worktree_store: Entity<WorktreeStore>,
 239        weak_project: Option<WeakEntity<Project>>,
 240        headless: bool,
 241        cx: &mut Context<Self>,
 242    ) -> Self {
 243        Self::new_internal(
 244            !headless,
 245            None,
 246            ContextServerDescriptorRegistry::default_global(cx),
 247            worktree_store,
 248            weak_project,
 249            ContextServerStoreState::Local {
 250                downstream_client: None,
 251                is_headless: headless,
 252            },
 253            cx,
 254        )
 255    }
 256
 257    pub fn remote(
 258        project_id: u64,
 259        upstream_client: Entity<RemoteClient>,
 260        worktree_store: Entity<WorktreeStore>,
 261        weak_project: Option<WeakEntity<Project>>,
 262        cx: &mut Context<Self>,
 263    ) -> Self {
 264        Self::new_internal(
 265            true,
 266            None,
 267            ContextServerDescriptorRegistry::default_global(cx),
 268            worktree_store,
 269            weak_project,
 270            ContextServerStoreState::Remote {
 271                project_id,
 272                upstream_client,
 273            },
 274            cx,
 275        )
 276    }
 277
 278    pub fn init_headless(session: &AnyProtoClient) {
 279        session.add_entity_request_handler(Self::handle_get_context_server_command);
 280    }
 281
 282    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
 283        if let ContextServerStoreState::Local {
 284            downstream_client, ..
 285        } = &mut self.state
 286        {
 287            *downstream_client = Some((project_id, client));
 288        }
 289    }
 290
 291    pub fn is_remote_project(&self) -> bool {
 292        matches!(self.state, ContextServerStoreState::Remote { .. })
 293    }
 294
 295    /// Returns all configured context server ids, excluding the ones that are disabled
 296    pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
 297        self.context_server_settings
 298            .iter()
 299            .filter(|(_, settings)| settings.enabled())
 300            .map(|(id, _)| ContextServerId(id.clone()))
 301            .collect()
 302    }
 303
 304    #[cfg(feature = "test-support")]
 305    pub fn test(
 306        registry: Entity<ContextServerDescriptorRegistry>,
 307        worktree_store: Entity<WorktreeStore>,
 308        weak_project: Option<WeakEntity<Project>>,
 309        cx: &mut Context<Self>,
 310    ) -> Self {
 311        Self::new_internal(
 312            false,
 313            None,
 314            registry,
 315            worktree_store,
 316            weak_project,
 317            ContextServerStoreState::Local {
 318                downstream_client: None,
 319                is_headless: false,
 320            },
 321            cx,
 322        )
 323    }
 324
 325    #[cfg(feature = "test-support")]
 326    pub fn test_maintain_server_loop(
 327        context_server_factory: Option<ContextServerFactory>,
 328        registry: Entity<ContextServerDescriptorRegistry>,
 329        worktree_store: Entity<WorktreeStore>,
 330        weak_project: Option<WeakEntity<Project>>,
 331        cx: &mut Context<Self>,
 332    ) -> Self {
 333        Self::new_internal(
 334            true,
 335            context_server_factory,
 336            registry,
 337            worktree_store,
 338            weak_project,
 339            ContextServerStoreState::Local {
 340                downstream_client: None,
 341                is_headless: false,
 342            },
 343            cx,
 344        )
 345    }
 346
 347    #[cfg(feature = "test-support")]
 348    pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
 349        self.context_server_factory = Some(factory);
 350    }
 351
 352    #[cfg(feature = "test-support")]
 353    pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
 354        &self.registry
 355    }
 356
 357    #[cfg(feature = "test-support")]
 358    pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
 359        let configuration = Arc::new(ContextServerConfiguration::Custom {
 360            command: ContextServerCommand {
 361                path: "test".into(),
 362                args: vec![],
 363                env: None,
 364                timeout: None,
 365            },
 366            remote: false,
 367        });
 368        self.run_server(server, configuration, cx);
 369    }
 370
 371    fn new_internal(
 372        maintain_server_loop: bool,
 373        context_server_factory: Option<ContextServerFactory>,
 374        registry: Entity<ContextServerDescriptorRegistry>,
 375        worktree_store: Entity<WorktreeStore>,
 376        weak_project: Option<WeakEntity<Project>>,
 377        state: ContextServerStoreState,
 378        cx: &mut Context<Self>,
 379    ) -> Self {
 380        let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
 381            let ai_disabled = DisableAiSettings::get_global(cx).disable_ai;
 382            let ai_was_disabled = this.ai_disabled;
 383            this.ai_disabled = ai_disabled;
 384
 385            let settings =
 386                &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
 387            let settings_changed = &this.context_server_settings != settings;
 388
 389            if settings_changed {
 390                this.context_server_settings = settings.clone();
 391            }
 392
 393            // When AI is disabled, stop all running servers
 394            if ai_disabled {
 395                let server_ids: Vec<_> = this.servers.keys().cloned().collect();
 396                for id in server_ids {
 397                    this.stop_server(&id, cx).log_err();
 398                }
 399                return;
 400            }
 401
 402            // Trigger updates if AI was re-enabled or settings changed
 403            if maintain_server_loop && (ai_was_disabled || settings_changed) {
 404                this.available_context_servers_changed(cx);
 405            }
 406        })];
 407
 408        if maintain_server_loop {
 409            subscriptions.push(cx.observe(&registry, |this, _registry, cx| {
 410                if !DisableAiSettings::get_global(cx).disable_ai {
 411                    this.available_context_servers_changed(cx);
 412                }
 413            }));
 414        }
 415
 416        let ai_disabled = DisableAiSettings::get_global(cx).disable_ai;
 417        let mut this = Self {
 418            state,
 419            _subscriptions: subscriptions,
 420            context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
 421                .context_servers
 422                .clone(),
 423            worktree_store,
 424            project: weak_project,
 425            registry,
 426            needs_server_update: false,
 427            ai_disabled,
 428            servers: HashMap::default(),
 429            server_ids: Default::default(),
 430            update_servers_task: None,
 431            context_server_factory,
 432        };
 433        if maintain_server_loop && !DisableAiSettings::get_global(cx).disable_ai {
 434            this.available_context_servers_changed(cx);
 435        }
 436        this
 437    }
 438
 439    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 440        self.servers.get(id).map(|state| state.server())
 441    }
 442
 443    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 444        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
 445            Some(server.clone())
 446        } else {
 447            None
 448        }
 449    }
 450
 451    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
 452        self.servers.get(id).map(ContextServerStatus::from_state)
 453    }
 454
 455    pub fn configuration_for_server(
 456        &self,
 457        id: &ContextServerId,
 458    ) -> Option<Arc<ContextServerConfiguration>> {
 459        self.servers.get(id).map(|state| state.configuration())
 460    }
 461
 462    /// Returns a sorted slice of available unique context server IDs. Within the
 463    /// slice, context servers which have `mcp-server-` as a prefix in their ID will
 464    /// appear after servers that do not have this prefix in their ID.
 465    pub fn server_ids(&self) -> &[ContextServerId] {
 466        self.server_ids.as_slice()
 467    }
 468
 469    fn populate_server_ids(&mut self, cx: &App) {
 470        self.server_ids = self
 471            .servers
 472            .keys()
 473            .cloned()
 474            .chain(
 475                self.registry
 476                    .read(cx)
 477                    .context_server_descriptors()
 478                    .into_iter()
 479                    .map(|(id, _)| ContextServerId(id)),
 480            )
 481            .chain(
 482                self.context_server_settings
 483                    .keys()
 484                    .map(|id| ContextServerId(id.clone())),
 485            )
 486            .unique()
 487            .sorted_unstable_by(
 488                // Sort context servers: ones without mcp-server- prefix first, then prefixed ones
 489                |a, b| {
 490                    const MCP_PREFIX: &str = "mcp-server-";
 491                    match (a.0.strip_prefix(MCP_PREFIX), b.0.strip_prefix(MCP_PREFIX)) {
 492                        // If one has mcp-server- prefix and other doesn't, non-mcp comes first
 493                        (Some(_), None) => std::cmp::Ordering::Greater,
 494                        (None, Some(_)) => std::cmp::Ordering::Less,
 495                        // If both have same prefix status, sort by appropriate key
 496                        (Some(a), Some(b)) => a.cmp(b),
 497                        (None, None) => a.0.cmp(&b.0),
 498                    }
 499                },
 500            )
 501            .collect();
 502    }
 503
 504    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
 505        self.servers
 506            .values()
 507            .filter_map(|state| {
 508                if let ContextServerState::Running { server, .. } = state {
 509                    Some(server.clone())
 510                } else {
 511                    None
 512                }
 513            })
 514            .collect()
 515    }
 516
 517    pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
 518        cx.spawn(async move |this, cx| {
 519            let this = this.upgrade().context("Context server store dropped")?;
 520            let settings = this
 521                .update(cx, |this, _| {
 522                    this.context_server_settings.get(&server.id().0).cloned()
 523                })
 524                .context("Failed to get context server settings")?;
 525
 526            if !settings.enabled() {
 527                return anyhow::Ok(());
 528            }
 529
 530            let (registry, worktree_store) = this.update(cx, |this, _| {
 531                (this.registry.clone(), this.worktree_store.clone())
 532            });
 533            let configuration = ContextServerConfiguration::from_settings(
 534                settings,
 535                server.id(),
 536                registry,
 537                worktree_store,
 538                cx,
 539            )
 540            .await
 541            .context("Failed to create context server configuration")?;
 542
 543            this.update(cx, |this, cx| {
 544                this.run_server(server, Arc::new(configuration), cx)
 545            });
 546            Ok(())
 547        })
 548        .detach_and_log_err(cx);
 549    }
 550
 551    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 552        if matches!(
 553            self.servers.get(id),
 554            Some(ContextServerState::Stopped { .. })
 555        ) {
 556            return Ok(());
 557        }
 558
 559        let state = self
 560            .servers
 561            .remove(id)
 562            .context("Context server not found")?;
 563
 564        let server = state.server();
 565        let configuration = state.configuration();
 566        let mut result = Ok(());
 567        if let ContextServerState::Running { server, .. } = &state {
 568            result = server.stop();
 569        }
 570        drop(state);
 571
 572        self.update_server_state(
 573            id.clone(),
 574            ContextServerState::Stopped {
 575                configuration,
 576                server,
 577            },
 578            cx,
 579        );
 580
 581        result
 582    }
 583
 584    fn run_server(
 585        &mut self,
 586        server: Arc<ContextServer>,
 587        configuration: Arc<ContextServerConfiguration>,
 588        cx: &mut Context<Self>,
 589    ) {
 590        let id = server.id();
 591        if matches!(
 592            self.servers.get(&id),
 593            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
 594        ) {
 595            self.stop_server(&id, cx).log_err();
 596        }
 597        let task = cx.spawn({
 598            let id = server.id();
 599            let server = server.clone();
 600            let configuration = configuration.clone();
 601
 602            async move |this, cx| {
 603                match server.clone().start(cx).await {
 604                    Ok(_) => {
 605                        debug_assert!(server.client().is_some());
 606
 607                        this.update(cx, |this, cx| {
 608                            this.update_server_state(
 609                                id.clone(),
 610                                ContextServerState::Running {
 611                                    server,
 612                                    configuration,
 613                                },
 614                                cx,
 615                            )
 616                        })
 617                        .log_err()
 618                    }
 619                    Err(err) => {
 620                        log::error!("{} context server failed to start: {}", id, err);
 621                        this.update(cx, |this, cx| {
 622                            this.update_server_state(
 623                                id.clone(),
 624                                ContextServerState::Error {
 625                                    configuration,
 626                                    server,
 627                                    error: err.to_string().into(),
 628                                },
 629                                cx,
 630                            )
 631                        })
 632                        .log_err()
 633                    }
 634                };
 635            }
 636        });
 637
 638        self.update_server_state(
 639            id.clone(),
 640            ContextServerState::Starting {
 641                configuration,
 642                _task: task,
 643                server,
 644            },
 645            cx,
 646        );
 647    }
 648
 649    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 650        let state = self
 651            .servers
 652            .remove(id)
 653            .context("Context server not found")?;
 654        drop(state);
 655        cx.emit(ServerStatusChangedEvent {
 656            server_id: id.clone(),
 657            status: ContextServerStatus::Stopped,
 658        });
 659        Ok(())
 660    }
 661
 662    pub async fn create_context_server(
 663        this: WeakEntity<Self>,
 664        id: ContextServerId,
 665        configuration: Arc<ContextServerConfiguration>,
 666        cx: &mut AsyncApp,
 667    ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
 668        let remote = configuration.remote();
 669        let needs_remote_command = match configuration.as_ref() {
 670            ContextServerConfiguration::Custom { .. }
 671            | ContextServerConfiguration::Extension { .. } => remote,
 672            ContextServerConfiguration::Http { .. } => false,
 673        };
 674
 675        let (remote_state, is_remote_project) = this.update(cx, |this, _| {
 676            let remote_state = match &this.state {
 677                ContextServerStoreState::Remote {
 678                    project_id,
 679                    upstream_client,
 680                } if needs_remote_command => Some((*project_id, upstream_client.clone())),
 681                _ => None,
 682            };
 683            (remote_state, this.is_remote_project())
 684        })?;
 685
 686        let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
 687            this.project
 688                .as_ref()
 689                .and_then(|project| {
 690                    project
 691                        .read_with(cx, |project, cx| project.active_project_directory(cx))
 692                        .ok()
 693                        .flatten()
 694                })
 695                .or_else(|| {
 696                    this.worktree_store.read_with(cx, |store, cx| {
 697                        store.visible_worktrees(cx).fold(None, |acc, item| {
 698                            if acc.is_none() {
 699                                item.read(cx).root_dir()
 700                            } else {
 701                                acc
 702                            }
 703                        })
 704                    })
 705                })
 706        })?;
 707
 708        let configuration = if let Some((project_id, upstream_client)) = remote_state {
 709            let root_dir = root_path.as_ref().map(|p| p.display().to_string());
 710
 711            let response = upstream_client
 712                .update(cx, |client, _| {
 713                    client
 714                        .proto_client()
 715                        .request(proto::GetContextServerCommand {
 716                            project_id,
 717                            server_id: id.0.to_string(),
 718                            root_dir: root_dir.clone(),
 719                        })
 720                })
 721                .await?;
 722
 723            let remote_command = upstream_client.update(cx, |client, _| {
 724                client.build_command(
 725                    Some(response.path),
 726                    &response.args,
 727                    &response.env.into_iter().collect(),
 728                    root_dir,
 729                    None,
 730                )
 731            })?;
 732
 733            let command = ContextServerCommand {
 734                path: remote_command.program.into(),
 735                args: remote_command.args,
 736                env: Some(remote_command.env.into_iter().collect()),
 737                timeout: None,
 738            };
 739
 740            Arc::new(ContextServerConfiguration::Custom { command, remote })
 741        } else {
 742            configuration
 743        };
 744
 745        let server: Arc<ContextServer> = this.update(cx, |this, cx| {
 746            let global_timeout =
 747                Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
 748
 749            if let Some(factory) = this.context_server_factory.as_ref() {
 750                return anyhow::Ok(factory(id.clone(), configuration.clone()));
 751            }
 752
 753            match configuration.as_ref() {
 754                ContextServerConfiguration::Http {
 755                    url,
 756                    headers,
 757                    timeout,
 758                } => anyhow::Ok(Arc::new(ContextServer::http(
 759                    id,
 760                    url,
 761                    headers.clone(),
 762                    cx.http_client(),
 763                    cx.background_executor().clone(),
 764                    Some(Duration::from_secs(
 765                        timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
 766                    )),
 767                )?)),
 768                _ => {
 769                    let mut command = configuration
 770                        .command()
 771                        .context("Missing command configuration for stdio context server")?
 772                        .clone();
 773                    command.timeout = Some(
 774                        command
 775                            .timeout
 776                            .unwrap_or(global_timeout)
 777                            .min(MAX_TIMEOUT_SECS),
 778                    );
 779
 780                    // Don't pass remote paths as working directory for locally-spawned processes
 781                    let working_directory = if is_remote_project { None } else { root_path };
 782                    anyhow::Ok(Arc::new(ContextServer::stdio(
 783                        id,
 784                        command,
 785                        working_directory,
 786                    )))
 787                }
 788            }
 789        })??;
 790
 791        Ok((server, configuration))
 792    }
 793
 794    async fn handle_get_context_server_command(
 795        this: Entity<Self>,
 796        envelope: TypedEnvelope<proto::GetContextServerCommand>,
 797        mut cx: AsyncApp,
 798    ) -> Result<proto::ContextServerCommand> {
 799        let server_id = ContextServerId(envelope.payload.server_id.into());
 800
 801        let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
 802            let ContextServerStoreState::Local {
 803                is_headless: true, ..
 804            } = &this.state
 805            else {
 806                anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
 807            };
 808
 809            let settings = this
 810                .context_server_settings
 811                .get(&server_id.0)
 812                .cloned()
 813                .or_else(|| {
 814                    this.registry
 815                        .read(inner_cx)
 816                        .context_server_descriptor(&server_id.0)
 817                        .map(|_| ContextServerSettings::default_extension())
 818                })
 819                .with_context(|| format!("context server `{}` not found", server_id))?;
 820
 821            anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
 822        })?;
 823
 824        let configuration = ContextServerConfiguration::from_settings(
 825            settings,
 826            server_id.clone(),
 827            registry,
 828            worktree_store,
 829            &cx,
 830        )
 831        .await
 832        .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
 833
 834        let command = configuration
 835            .command()
 836            .context("context server has no command (HTTP servers don't need RPC)")?;
 837
 838        Ok(proto::ContextServerCommand {
 839            path: command.path.display().to_string(),
 840            args: command.args.clone(),
 841            env: command
 842                .env
 843                .clone()
 844                .map(|env| env.into_iter().collect())
 845                .unwrap_or_default(),
 846        })
 847    }
 848
 849    fn resolve_project_settings<'a>(
 850        worktree_store: &'a Entity<WorktreeStore>,
 851        cx: &'a App,
 852    ) -> &'a ProjectSettings {
 853        let location = worktree_store
 854            .read(cx)
 855            .visible_worktrees(cx)
 856            .next()
 857            .map(|worktree| settings::SettingsLocation {
 858                worktree_id: worktree.read(cx).id(),
 859                path: RelPath::empty(),
 860            });
 861        ProjectSettings::get(location, cx)
 862    }
 863
 864    fn update_server_state(
 865        &mut self,
 866        id: ContextServerId,
 867        state: ContextServerState,
 868        cx: &mut Context<Self>,
 869    ) {
 870        let status = ContextServerStatus::from_state(&state);
 871        self.servers.insert(id.clone(), state);
 872        cx.emit(ServerStatusChangedEvent {
 873            server_id: id,
 874            status,
 875        });
 876    }
 877
 878    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
 879        if self.update_servers_task.is_some() {
 880            self.needs_server_update = true;
 881        } else {
 882            self.needs_server_update = false;
 883            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
 884                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
 885                    log::error!("Error maintaining context servers: {}", err);
 886                }
 887
 888                this.update(cx, |this, cx| {
 889                    this.populate_server_ids(cx);
 890                    cx.notify();
 891                    this.update_servers_task.take();
 892                    if this.needs_server_update {
 893                        this.available_context_servers_changed(cx);
 894                    }
 895                })?;
 896
 897                Ok(())
 898            }));
 899        }
 900    }
 901
 902    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
 903        // Don't start context servers if AI is disabled
 904        let ai_disabled = this.update(cx, |_, cx| DisableAiSettings::get_global(cx).disable_ai)?;
 905        if ai_disabled {
 906            // Stop all running servers when AI is disabled
 907            this.update(cx, |this, cx| {
 908                let server_ids: Vec<_> = this.servers.keys().cloned().collect();
 909                for id in server_ids {
 910                    let _ = this.stop_server(&id, cx);
 911                }
 912            })?;
 913            return Ok(());
 914        }
 915
 916        let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
 917            (
 918                this.context_server_settings.clone(),
 919                this.registry.clone(),
 920                this.worktree_store.clone(),
 921            )
 922        })?;
 923
 924        for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
 925            configured_servers
 926                .entry(id)
 927                .or_insert(ContextServerSettings::default_extension());
 928        }
 929
 930        let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
 931            configured_servers
 932                .into_iter()
 933                .partition(|(_, settings)| settings.enabled());
 934
 935        let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
 936            let id = ContextServerId(id);
 937            ContextServerConfiguration::from_settings(
 938                settings,
 939                id.clone(),
 940                registry.clone(),
 941                worktree_store.clone(),
 942                cx,
 943            )
 944            .map(move |config| (id, config))
 945        }))
 946        .await
 947        .into_iter()
 948        .filter_map(|(id, config)| config.map(|config| (id, config)))
 949        .collect::<HashMap<_, _>>();
 950
 951        let mut servers_to_start = Vec::new();
 952        let mut servers_to_remove = HashSet::default();
 953        let mut servers_to_stop = HashSet::default();
 954
 955        this.update(cx, |this, _cx| {
 956            for server_id in this.servers.keys() {
 957                // All servers that are not in desired_servers should be removed from the store.
 958                // This can happen if the user removed a server from the context server settings.
 959                if !configured_servers.contains_key(server_id) {
 960                    if disabled_servers.contains_key(&server_id.0) {
 961                        servers_to_stop.insert(server_id.clone());
 962                    } else {
 963                        servers_to_remove.insert(server_id.clone());
 964                    }
 965                }
 966            }
 967
 968            for (id, config) in configured_servers {
 969                let state = this.servers.get(&id);
 970                let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
 971                let existing_config = state.as_ref().map(|state| state.configuration());
 972                if existing_config.as_deref() != Some(&config) || is_stopped {
 973                    let config = Arc::new(config);
 974                    servers_to_start.push((id.clone(), config));
 975                    if this.servers.contains_key(&id) {
 976                        servers_to_stop.insert(id);
 977                    }
 978                }
 979            }
 980
 981            anyhow::Ok(())
 982        })??;
 983
 984        this.update(cx, |this, inner_cx| {
 985            for id in servers_to_stop {
 986                this.stop_server(&id, inner_cx)?;
 987            }
 988            for id in servers_to_remove {
 989                this.remove_server(&id, inner_cx)?;
 990            }
 991            anyhow::Ok(())
 992        })??;
 993
 994        for (id, config) in servers_to_start {
 995            match Self::create_context_server(this.clone(), id.clone(), config, cx).await {
 996                Ok((server, config)) => {
 997                    this.update(cx, |this, cx| {
 998                        this.run_server(server, config, cx);
 999                    })?;
1000                }
1001                Err(err) => {
1002                    log::error!("{id} context server failed to create: {err:#}");
1003                    this.update(cx, |_this, cx| {
1004                        cx.emit(ServerStatusChangedEvent {
1005                            server_id: id,
1006                            status: ContextServerStatus::Error(err.to_string().into()),
1007                        });
1008                        cx.notify();
1009                    })?;
1010                }
1011            }
1012        }
1013
1014        Ok(())
1015    }
1016}