context_server_store.rs

   1pub mod extension;
   2pub mod registry;
   3
   4use std::path::Path;
   5use std::sync::Arc;
   6use std::time::Duration;
   7
   8use anyhow::{Context as _, Result};
   9use collections::{HashMap, HashSet};
  10use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  11use futures::{FutureExt as _, future::join_all};
  12use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
  13use registry::ContextServerDescriptorRegistry;
  14use remote::RemoteClient;
  15use rpc::{AnyProtoClient, TypedEnvelope, proto};
  16use settings::{Settings as _, SettingsStore};
  17use util::{ResultExt as _, rel_path::RelPath};
  18
  19use crate::{
  20    Project,
  21    project_settings::{ContextServerSettings, ProjectSettings},
  22    worktree_store::WorktreeStore,
  23};
  24
  25/// Maximum timeout for context server requests
  26/// Prevents extremely large timeout values from tying up resources indefinitely.
  27const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
  28
  29pub fn init(cx: &mut App) {
  30    extension::init(cx);
  31}
  32
  33actions!(
  34    context_server,
  35    [
  36        /// Restarts the context server.
  37        Restart
  38    ]
  39);
  40
  41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
  42pub enum ContextServerStatus {
  43    Starting,
  44    Running,
  45    Stopped,
  46    Error(Arc<str>),
  47}
  48
  49impl ContextServerStatus {
  50    fn from_state(state: &ContextServerState) -> Self {
  51        match state {
  52            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
  53            ContextServerState::Running { .. } => ContextServerStatus::Running,
  54            ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
  55            ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
  56        }
  57    }
  58}
  59
  60enum ContextServerState {
  61    Starting {
  62        server: Arc<ContextServer>,
  63        configuration: Arc<ContextServerConfiguration>,
  64        _task: Task<()>,
  65    },
  66    Running {
  67        server: Arc<ContextServer>,
  68        configuration: Arc<ContextServerConfiguration>,
  69    },
  70    Stopped {
  71        server: Arc<ContextServer>,
  72        configuration: Arc<ContextServerConfiguration>,
  73    },
  74    Error {
  75        server: Arc<ContextServer>,
  76        configuration: Arc<ContextServerConfiguration>,
  77        error: Arc<str>,
  78    },
  79}
  80
  81impl ContextServerState {
  82    pub fn server(&self) -> Arc<ContextServer> {
  83        match self {
  84            ContextServerState::Starting { server, .. } => server.clone(),
  85            ContextServerState::Running { server, .. } => server.clone(),
  86            ContextServerState::Stopped { server, .. } => server.clone(),
  87            ContextServerState::Error { server, .. } => server.clone(),
  88        }
  89    }
  90
  91    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
  92        match self {
  93            ContextServerState::Starting { configuration, .. } => configuration.clone(),
  94            ContextServerState::Running { configuration, .. } => configuration.clone(),
  95            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
  96            ContextServerState::Error { configuration, .. } => configuration.clone(),
  97        }
  98    }
  99}
 100
 101#[derive(Debug, PartialEq, Eq)]
 102pub enum ContextServerConfiguration {
 103    Custom {
 104        command: ContextServerCommand,
 105        remote: bool,
 106    },
 107    Extension {
 108        command: ContextServerCommand,
 109        settings: serde_json::Value,
 110        remote: bool,
 111    },
 112    Http {
 113        url: url::Url,
 114        headers: HashMap<String, String>,
 115        timeout: Option<u64>,
 116    },
 117}
 118
 119impl ContextServerConfiguration {
 120    pub fn command(&self) -> Option<&ContextServerCommand> {
 121        match self {
 122            ContextServerConfiguration::Custom { command, .. } => Some(command),
 123            ContextServerConfiguration::Extension { command, .. } => Some(command),
 124            ContextServerConfiguration::Http { .. } => None,
 125        }
 126    }
 127
 128    pub fn remote(&self) -> bool {
 129        match self {
 130            ContextServerConfiguration::Custom { remote, .. } => *remote,
 131            ContextServerConfiguration::Extension { remote, .. } => *remote,
 132            ContextServerConfiguration::Http { .. } => false,
 133        }
 134    }
 135
 136    pub async fn from_settings(
 137        settings: ContextServerSettings,
 138        id: ContextServerId,
 139        registry: Entity<ContextServerDescriptorRegistry>,
 140        worktree_store: Entity<WorktreeStore>,
 141        cx: &AsyncApp,
 142    ) -> Option<Self> {
 143        match settings {
 144            ContextServerSettings::Stdio {
 145                enabled: _,
 146                command,
 147                remote,
 148            } => Some(ContextServerConfiguration::Custom { command, remote }),
 149            ContextServerSettings::Extension {
 150                enabled: _,
 151                settings,
 152                remote,
 153            } => {
 154                let descriptor =
 155                    cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
 156
 157                match descriptor.command(worktree_store, cx).await {
 158                    Ok(command) => Some(ContextServerConfiguration::Extension {
 159                        command,
 160                        settings,
 161                        remote,
 162                    }),
 163                    Err(e) => {
 164                        log::error!(
 165                            "Failed to create context server configuration from settings: {e:#}"
 166                        );
 167                        None
 168                    }
 169                }
 170            }
 171            ContextServerSettings::Http {
 172                enabled: _,
 173                url,
 174                headers: auth,
 175                timeout,
 176            } => {
 177                let url = url::Url::parse(&url).log_err()?;
 178                Some(ContextServerConfiguration::Http {
 179                    url,
 180                    headers: auth,
 181                    timeout,
 182                })
 183            }
 184        }
 185    }
 186}
 187
 188pub type ContextServerFactory =
 189    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
 190
 191enum ContextServerStoreState {
 192    Local {
 193        downstream_client: Option<(u64, AnyProtoClient)>,
 194        is_headless: bool,
 195    },
 196    Remote {
 197        project_id: u64,
 198        upstream_client: Entity<RemoteClient>,
 199    },
 200}
 201
 202pub struct ContextServerStore {
 203    state: ContextServerStoreState,
 204    context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
 205    servers: HashMap<ContextServerId, ContextServerState>,
 206    worktree_store: Entity<WorktreeStore>,
 207    project: Option<WeakEntity<Project>>,
 208    registry: Entity<ContextServerDescriptorRegistry>,
 209    update_servers_task: Option<Task<Result<()>>>,
 210    context_server_factory: Option<ContextServerFactory>,
 211    needs_server_update: bool,
 212    _subscriptions: Vec<Subscription>,
 213}
 214
 215pub enum Event {
 216    ServerStatusChanged {
 217        server_id: ContextServerId,
 218        status: ContextServerStatus,
 219    },
 220}
 221
 222impl EventEmitter<Event> for ContextServerStore {}
 223
 224impl ContextServerStore {
 225    pub fn local(
 226        worktree_store: Entity<WorktreeStore>,
 227        weak_project: Option<WeakEntity<Project>>,
 228        headless: bool,
 229        cx: &mut Context<Self>,
 230    ) -> Self {
 231        Self::new_internal(
 232            !headless,
 233            None,
 234            ContextServerDescriptorRegistry::default_global(cx),
 235            worktree_store,
 236            weak_project,
 237            ContextServerStoreState::Local {
 238                downstream_client: None,
 239                is_headless: headless,
 240            },
 241            cx,
 242        )
 243    }
 244
 245    pub fn remote(
 246        project_id: u64,
 247        upstream_client: Entity<RemoteClient>,
 248        worktree_store: Entity<WorktreeStore>,
 249        weak_project: Option<WeakEntity<Project>>,
 250        cx: &mut Context<Self>,
 251    ) -> Self {
 252        Self::new_internal(
 253            true,
 254            None,
 255            ContextServerDescriptorRegistry::default_global(cx),
 256            worktree_store,
 257            weak_project,
 258            ContextServerStoreState::Remote {
 259                project_id,
 260                upstream_client,
 261            },
 262            cx,
 263        )
 264    }
 265
 266    pub fn init_headless(session: &AnyProtoClient) {
 267        session.add_entity_request_handler(Self::handle_get_context_server_command);
 268    }
 269
 270    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient) {
 271        if let ContextServerStoreState::Local {
 272            downstream_client, ..
 273        } = &mut self.state
 274        {
 275            *downstream_client = Some((project_id, client));
 276        }
 277    }
 278
 279    pub fn is_remote_project(&self) -> bool {
 280        matches!(self.state, ContextServerStoreState::Remote { .. })
 281    }
 282
 283    /// Returns all configured context server ids, excluding the ones that are disabled
 284    pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
 285        self.context_server_settings
 286            .iter()
 287            .filter(|(_, settings)| settings.enabled())
 288            .map(|(id, _)| ContextServerId(id.clone()))
 289            .collect()
 290    }
 291
 292    #[cfg(any(test, feature = "test-support"))]
 293    pub fn test(
 294        registry: Entity<ContextServerDescriptorRegistry>,
 295        worktree_store: Entity<WorktreeStore>,
 296        weak_project: Option<WeakEntity<Project>>,
 297        cx: &mut Context<Self>,
 298    ) -> Self {
 299        Self::new_internal(
 300            false,
 301            None,
 302            registry,
 303            worktree_store,
 304            weak_project,
 305            ContextServerStoreState::Local {
 306                downstream_client: None,
 307                is_headless: false,
 308            },
 309            cx,
 310        )
 311    }
 312
 313    #[cfg(any(test, feature = "test-support"))]
 314    pub fn test_maintain_server_loop(
 315        context_server_factory: Option<ContextServerFactory>,
 316        registry: Entity<ContextServerDescriptorRegistry>,
 317        worktree_store: Entity<WorktreeStore>,
 318        weak_project: Option<WeakEntity<Project>>,
 319        cx: &mut Context<Self>,
 320    ) -> Self {
 321        Self::new_internal(
 322            true,
 323            context_server_factory,
 324            registry,
 325            worktree_store,
 326            weak_project,
 327            ContextServerStoreState::Local {
 328                downstream_client: None,
 329                is_headless: false,
 330            },
 331            cx,
 332        )
 333    }
 334
 335    #[cfg(any(test, feature = "test-support"))]
 336    pub fn set_context_server_factory(&mut self, factory: ContextServerFactory) {
 337        self.context_server_factory = Some(factory);
 338    }
 339
 340    #[cfg(any(test, feature = "test-support"))]
 341    pub fn registry(&self) -> &Entity<ContextServerDescriptorRegistry> {
 342        &self.registry
 343    }
 344
 345    #[cfg(any(test, feature = "test-support"))]
 346    pub fn test_start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
 347        let configuration = Arc::new(ContextServerConfiguration::Custom {
 348            command: ContextServerCommand {
 349                path: "test".into(),
 350                args: vec![],
 351                env: None,
 352                timeout: None,
 353            },
 354            remote: false,
 355        });
 356        self.run_server(server, configuration, cx);
 357    }
 358
 359    fn new_internal(
 360        maintain_server_loop: bool,
 361        context_server_factory: Option<ContextServerFactory>,
 362        registry: Entity<ContextServerDescriptorRegistry>,
 363        worktree_store: Entity<WorktreeStore>,
 364        weak_project: Option<WeakEntity<Project>>,
 365        state: ContextServerStoreState,
 366        cx: &mut Context<Self>,
 367    ) -> Self {
 368        let mut subscriptions = vec![cx.observe_global::<SettingsStore>(move |this, cx| {
 369            let settings =
 370                &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
 371            if &this.context_server_settings == settings {
 372                return;
 373            }
 374            this.context_server_settings = settings.clone();
 375            if maintain_server_loop {
 376                this.available_context_servers_changed(cx);
 377            }
 378        })];
 379
 380        if maintain_server_loop {
 381            subscriptions.push(cx.observe(&registry, |this, _registry, cx| {
 382                this.available_context_servers_changed(cx);
 383            }));
 384        }
 385
 386        let mut this = Self {
 387            state,
 388            _subscriptions: subscriptions,
 389            context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
 390                .context_servers
 391                .clone(),
 392            worktree_store,
 393            project: weak_project,
 394            registry,
 395            needs_server_update: false,
 396            servers: HashMap::default(),
 397            update_servers_task: None,
 398            context_server_factory,
 399        };
 400        if maintain_server_loop {
 401            this.available_context_servers_changed(cx);
 402        }
 403        this
 404    }
 405
 406    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 407        self.servers.get(id).map(|state| state.server())
 408    }
 409
 410    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 411        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
 412            Some(server.clone())
 413        } else {
 414            None
 415        }
 416    }
 417
 418    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
 419        self.servers.get(id).map(ContextServerStatus::from_state)
 420    }
 421
 422    pub fn configuration_for_server(
 423        &self,
 424        id: &ContextServerId,
 425    ) -> Option<Arc<ContextServerConfiguration>> {
 426        self.servers.get(id).map(|state| state.configuration())
 427    }
 428
 429    pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
 430        self.servers
 431            .keys()
 432            .cloned()
 433            .chain(
 434                self.registry
 435                    .read(cx)
 436                    .context_server_descriptors()
 437                    .into_iter()
 438                    .map(|(id, _)| ContextServerId(id)),
 439            )
 440            .collect()
 441    }
 442
 443    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
 444        self.servers
 445            .values()
 446            .filter_map(|state| {
 447                if let ContextServerState::Running { server, .. } = state {
 448                    Some(server.clone())
 449                } else {
 450                    None
 451                }
 452            })
 453            .collect()
 454    }
 455
 456    pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
 457        cx.spawn(async move |this, cx| {
 458            let this = this.upgrade().context("Context server store dropped")?;
 459            let settings = this
 460                .update(cx, |this, _| {
 461                    this.context_server_settings.get(&server.id().0).cloned()
 462                })
 463                .context("Failed to get context server settings")?;
 464
 465            if !settings.enabled() {
 466                return anyhow::Ok(());
 467            }
 468
 469            let (registry, worktree_store) = this.update(cx, |this, _| {
 470                (this.registry.clone(), this.worktree_store.clone())
 471            });
 472            let configuration = ContextServerConfiguration::from_settings(
 473                settings,
 474                server.id(),
 475                registry,
 476                worktree_store,
 477                cx,
 478            )
 479            .await
 480            .context("Failed to create context server configuration")?;
 481
 482            this.update(cx, |this, cx| {
 483                this.run_server(server, Arc::new(configuration), cx)
 484            });
 485            Ok(())
 486        })
 487        .detach_and_log_err(cx);
 488    }
 489
 490    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 491        if matches!(
 492            self.servers.get(id),
 493            Some(ContextServerState::Stopped { .. })
 494        ) {
 495            return Ok(());
 496        }
 497
 498        let state = self
 499            .servers
 500            .remove(id)
 501            .context("Context server not found")?;
 502
 503        let server = state.server();
 504        let configuration = state.configuration();
 505        let mut result = Ok(());
 506        if let ContextServerState::Running { server, .. } = &state {
 507            result = server.stop();
 508        }
 509        drop(state);
 510
 511        self.update_server_state(
 512            id.clone(),
 513            ContextServerState::Stopped {
 514                configuration,
 515                server,
 516            },
 517            cx,
 518        );
 519
 520        result
 521    }
 522
 523    fn run_server(
 524        &mut self,
 525        server: Arc<ContextServer>,
 526        configuration: Arc<ContextServerConfiguration>,
 527        cx: &mut Context<Self>,
 528    ) {
 529        let id = server.id();
 530        if matches!(
 531            self.servers.get(&id),
 532            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
 533        ) {
 534            self.stop_server(&id, cx).log_err();
 535        }
 536        let task = cx.spawn({
 537            let id = server.id();
 538            let server = server.clone();
 539            let configuration = configuration.clone();
 540
 541            async move |this, cx| {
 542                match server.clone().start(cx).await {
 543                    Ok(_) => {
 544                        debug_assert!(server.client().is_some());
 545
 546                        this.update(cx, |this, cx| {
 547                            this.update_server_state(
 548                                id.clone(),
 549                                ContextServerState::Running {
 550                                    server,
 551                                    configuration,
 552                                },
 553                                cx,
 554                            )
 555                        })
 556                        .log_err()
 557                    }
 558                    Err(err) => {
 559                        log::error!("{} context server failed to start: {}", id, err);
 560                        this.update(cx, |this, cx| {
 561                            this.update_server_state(
 562                                id.clone(),
 563                                ContextServerState::Error {
 564                                    configuration,
 565                                    server,
 566                                    error: err.to_string().into(),
 567                                },
 568                                cx,
 569                            )
 570                        })
 571                        .log_err()
 572                    }
 573                };
 574            }
 575        });
 576
 577        self.update_server_state(
 578            id.clone(),
 579            ContextServerState::Starting {
 580                configuration,
 581                _task: task,
 582                server,
 583            },
 584            cx,
 585        );
 586    }
 587
 588    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 589        let state = self
 590            .servers
 591            .remove(id)
 592            .context("Context server not found")?;
 593        drop(state);
 594        cx.emit(Event::ServerStatusChanged {
 595            server_id: id.clone(),
 596            status: ContextServerStatus::Stopped,
 597        });
 598        Ok(())
 599    }
 600
 601    async fn create_context_server(
 602        this: WeakEntity<Self>,
 603        id: ContextServerId,
 604        configuration: Arc<ContextServerConfiguration>,
 605        cx: &mut AsyncApp,
 606    ) -> Result<(Arc<ContextServer>, Arc<ContextServerConfiguration>)> {
 607        let remote = configuration.remote();
 608        let needs_remote_command = match configuration.as_ref() {
 609            ContextServerConfiguration::Custom { .. }
 610            | ContextServerConfiguration::Extension { .. } => remote,
 611            ContextServerConfiguration::Http { .. } => false,
 612        };
 613
 614        let (remote_state, is_remote_project) = this.update(cx, |this, _| {
 615            let remote_state = match &this.state {
 616                ContextServerStoreState::Remote {
 617                    project_id,
 618                    upstream_client,
 619                } if needs_remote_command => Some((*project_id, upstream_client.clone())),
 620                _ => None,
 621            };
 622            (remote_state, this.is_remote_project())
 623        })?;
 624
 625        let root_path: Option<Arc<Path>> = this.update(cx, |this, cx| {
 626            this.project
 627                .as_ref()
 628                .and_then(|project| {
 629                    project
 630                        .read_with(cx, |project, cx| project.active_project_directory(cx))
 631                        .ok()
 632                        .flatten()
 633                })
 634                .or_else(|| {
 635                    this.worktree_store.read_with(cx, |store, cx| {
 636                        store.visible_worktrees(cx).fold(None, |acc, item| {
 637                            if acc.is_none() {
 638                                item.read(cx).root_dir()
 639                            } else {
 640                                acc
 641                            }
 642                        })
 643                    })
 644                })
 645        })?;
 646
 647        let configuration = if let Some((project_id, upstream_client)) = remote_state {
 648            let root_dir = root_path.as_ref().map(|p| p.display().to_string());
 649
 650            let response = upstream_client
 651                .update(cx, |client, _| {
 652                    client
 653                        .proto_client()
 654                        .request(proto::GetContextServerCommand {
 655                            project_id,
 656                            server_id: id.0.to_string(),
 657                            root_dir: root_dir.clone(),
 658                        })
 659                })
 660                .await?;
 661
 662            let remote_command = upstream_client.update(cx, |client, _| {
 663                client.build_command(
 664                    Some(response.path),
 665                    &response.args,
 666                    &response.env.into_iter().collect(),
 667                    root_dir,
 668                    None,
 669                )
 670            })?;
 671
 672            let command = ContextServerCommand {
 673                path: remote_command.program.into(),
 674                args: remote_command.args,
 675                env: Some(remote_command.env.into_iter().collect()),
 676                timeout: None,
 677            };
 678
 679            Arc::new(ContextServerConfiguration::Custom { command, remote })
 680        } else {
 681            configuration
 682        };
 683
 684        let server: Arc<ContextServer> = this.update(cx, |this, cx| {
 685            let global_timeout =
 686                Self::resolve_project_settings(&this.worktree_store, cx).context_server_timeout;
 687
 688            if let Some(factory) = this.context_server_factory.as_ref() {
 689                return anyhow::Ok(factory(id.clone(), configuration.clone()));
 690            }
 691
 692            match configuration.as_ref() {
 693                ContextServerConfiguration::Http {
 694                    url,
 695                    headers,
 696                    timeout,
 697                } => anyhow::Ok(Arc::new(ContextServer::http(
 698                    id,
 699                    url,
 700                    headers.clone(),
 701                    cx.http_client(),
 702                    cx.background_executor().clone(),
 703                    Some(Duration::from_secs(
 704                        timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
 705                    )),
 706                )?)),
 707                _ => {
 708                    let mut command = configuration
 709                        .command()
 710                        .context("Missing command configuration for stdio context server")?
 711                        .clone();
 712                    command.timeout = Some(
 713                        command
 714                            .timeout
 715                            .unwrap_or(global_timeout)
 716                            .min(MAX_TIMEOUT_SECS),
 717                    );
 718
 719                    // Don't pass remote paths as working directory for locally-spawned processes
 720                    let working_directory = if is_remote_project { None } else { root_path };
 721                    anyhow::Ok(Arc::new(ContextServer::stdio(
 722                        id,
 723                        command,
 724                        working_directory,
 725                    )))
 726                }
 727            }
 728        })??;
 729
 730        Ok((server, configuration))
 731    }
 732
 733    async fn handle_get_context_server_command(
 734        this: Entity<Self>,
 735        envelope: TypedEnvelope<proto::GetContextServerCommand>,
 736        mut cx: AsyncApp,
 737    ) -> Result<proto::ContextServerCommand> {
 738        let server_id = ContextServerId(envelope.payload.server_id.into());
 739
 740        let (settings, registry, worktree_store) = this.update(&mut cx, |this, inner_cx| {
 741            let ContextServerStoreState::Local {
 742                is_headless: true, ..
 743            } = &this.state
 744            else {
 745                anyhow::bail!("unexpected GetContextServerCommand request in a non-local project");
 746            };
 747
 748            let settings = this
 749                .context_server_settings
 750                .get(&server_id.0)
 751                .cloned()
 752                .or_else(|| {
 753                    this.registry
 754                        .read(inner_cx)
 755                        .context_server_descriptor(&server_id.0)
 756                        .map(|_| ContextServerSettings::default_extension())
 757                })
 758                .with_context(|| format!("context server `{}` not found", server_id))?;
 759
 760            anyhow::Ok((settings, this.registry.clone(), this.worktree_store.clone()))
 761        })?;
 762
 763        let configuration = ContextServerConfiguration::from_settings(
 764            settings,
 765            server_id.clone(),
 766            registry,
 767            worktree_store,
 768            &cx,
 769        )
 770        .await
 771        .with_context(|| format!("failed to build configuration for `{}`", server_id))?;
 772
 773        let command = configuration
 774            .command()
 775            .context("context server has no command (HTTP servers don't need RPC)")?;
 776
 777        Ok(proto::ContextServerCommand {
 778            path: command.path.display().to_string(),
 779            args: command.args.clone(),
 780            env: command
 781                .env
 782                .clone()
 783                .map(|env| env.into_iter().collect())
 784                .unwrap_or_default(),
 785        })
 786    }
 787
 788    fn resolve_project_settings<'a>(
 789        worktree_store: &'a Entity<WorktreeStore>,
 790        cx: &'a App,
 791    ) -> &'a ProjectSettings {
 792        let location = worktree_store
 793            .read(cx)
 794            .visible_worktrees(cx)
 795            .next()
 796            .map(|worktree| settings::SettingsLocation {
 797                worktree_id: worktree.read(cx).id(),
 798                path: RelPath::empty(),
 799            });
 800        ProjectSettings::get(location, cx)
 801    }
 802
 803    fn update_server_state(
 804        &mut self,
 805        id: ContextServerId,
 806        state: ContextServerState,
 807        cx: &mut Context<Self>,
 808    ) {
 809        let status = ContextServerStatus::from_state(&state);
 810        self.servers.insert(id.clone(), state);
 811        cx.emit(Event::ServerStatusChanged {
 812            server_id: id,
 813            status,
 814        });
 815    }
 816
 817    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
 818        if self.update_servers_task.is_some() {
 819            self.needs_server_update = true;
 820        } else {
 821            self.needs_server_update = false;
 822            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
 823                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
 824                    log::error!("Error maintaining context servers: {}", err);
 825                }
 826
 827                this.update(cx, |this, cx| {
 828                    this.update_servers_task.take();
 829                    if this.needs_server_update {
 830                        this.available_context_servers_changed(cx);
 831                    }
 832                })?;
 833
 834                Ok(())
 835            }));
 836        }
 837    }
 838
 839    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
 840        let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
 841            (
 842                this.context_server_settings.clone(),
 843                this.registry.clone(),
 844                this.worktree_store.clone(),
 845            )
 846        })?;
 847
 848        for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
 849            configured_servers
 850                .entry(id)
 851                .or_insert(ContextServerSettings::default_extension());
 852        }
 853
 854        let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
 855            configured_servers
 856                .into_iter()
 857                .partition(|(_, settings)| settings.enabled());
 858
 859        let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
 860            let id = ContextServerId(id);
 861            ContextServerConfiguration::from_settings(
 862                settings,
 863                id.clone(),
 864                registry.clone(),
 865                worktree_store.clone(),
 866                cx,
 867            )
 868            .map(move |config| (id, config))
 869        }))
 870        .await
 871        .into_iter()
 872        .filter_map(|(id, config)| config.map(|config| (id, config)))
 873        .collect::<HashMap<_, _>>();
 874
 875        let mut servers_to_start = Vec::new();
 876        let mut servers_to_remove = HashSet::default();
 877        let mut servers_to_stop = HashSet::default();
 878
 879        this.update(cx, |this, _cx| {
 880            for server_id in this.servers.keys() {
 881                // All servers that are not in desired_servers should be removed from the store.
 882                // This can happen if the user removed a server from the context server settings.
 883                if !configured_servers.contains_key(server_id) {
 884                    if disabled_servers.contains_key(&server_id.0) {
 885                        servers_to_stop.insert(server_id.clone());
 886                    } else {
 887                        servers_to_remove.insert(server_id.clone());
 888                    }
 889                }
 890            }
 891
 892            for (id, config) in configured_servers {
 893                let state = this.servers.get(&id);
 894                let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
 895                let existing_config = state.as_ref().map(|state| state.configuration());
 896                if existing_config.as_deref() != Some(&config) || is_stopped {
 897                    let config = Arc::new(config);
 898                    servers_to_start.push((id.clone(), config));
 899                    if this.servers.contains_key(&id) {
 900                        servers_to_stop.insert(id);
 901                    }
 902                }
 903            }
 904
 905            anyhow::Ok(())
 906        })??;
 907
 908        this.update(cx, |this, inner_cx| {
 909            for id in servers_to_stop {
 910                this.stop_server(&id, inner_cx)?;
 911            }
 912            for id in servers_to_remove {
 913                this.remove_server(&id, inner_cx)?;
 914            }
 915            anyhow::Ok(())
 916        })??;
 917
 918        for (id, config) in servers_to_start {
 919            let (server, config) =
 920                Self::create_context_server(this.clone(), id, config, cx).await?;
 921            this.update(cx, |this, cx| {
 922                this.run_server(server, config, cx);
 923            })?;
 924        }
 925
 926        Ok(())
 927    }
 928}
 929
 930#[cfg(test)]
 931mod tests {
 932    use super::*;
 933    use crate::{
 934        FakeFs, Project, context_server_store::registry::ContextServerDescriptor,
 935        project_settings::ProjectSettings,
 936    };
 937    use context_server::test::create_fake_transport;
 938    use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
 939    use http_client::{FakeHttpClient, Response};
 940    use serde_json::json;
 941    use std::{cell::RefCell, path::PathBuf, rc::Rc};
 942    use util::path;
 943
 944    #[gpui::test]
 945    async fn test_context_server_status(cx: &mut TestAppContext) {
 946        const SERVER_1_ID: &str = "mcp-1";
 947        const SERVER_2_ID: &str = "mcp-2";
 948
 949        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
 950
 951        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 952        let store = cx.new(|cx| {
 953            ContextServerStore::test(
 954                registry.clone(),
 955                project.read(cx).worktree_store(),
 956                Some(project.downgrade()),
 957                cx,
 958            )
 959        });
 960
 961        let server_1_id = ContextServerId(SERVER_1_ID.into());
 962        let server_2_id = ContextServerId(SERVER_2_ID.into());
 963
 964        let server_1 = Arc::new(ContextServer::new(
 965            server_1_id.clone(),
 966            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 967        ));
 968        let server_2 = Arc::new(ContextServer::new(
 969            server_2_id.clone(),
 970            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
 971        ));
 972
 973        store.update(cx, |store, cx| store.test_start_server(server_1, cx));
 974
 975        cx.run_until_parked();
 976
 977        cx.update(|cx| {
 978            assert_eq!(
 979                store.read(cx).status_for_server(&server_1_id),
 980                Some(ContextServerStatus::Running)
 981            );
 982            assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
 983        });
 984
 985        store.update(cx, |store, cx| {
 986            store.test_start_server(server_2.clone(), cx)
 987        });
 988
 989        cx.run_until_parked();
 990
 991        cx.update(|cx| {
 992            assert_eq!(
 993                store.read(cx).status_for_server(&server_1_id),
 994                Some(ContextServerStatus::Running)
 995            );
 996            assert_eq!(
 997                store.read(cx).status_for_server(&server_2_id),
 998                Some(ContextServerStatus::Running)
 999            );
1000        });
1001
1002        store
1003            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
1004            .unwrap();
1005
1006        cx.update(|cx| {
1007            assert_eq!(
1008                store.read(cx).status_for_server(&server_1_id),
1009                Some(ContextServerStatus::Running)
1010            );
1011            assert_eq!(
1012                store.read(cx).status_for_server(&server_2_id),
1013                Some(ContextServerStatus::Stopped)
1014            );
1015        });
1016    }
1017
1018    #[gpui::test]
1019    async fn test_context_server_status_events(cx: &mut TestAppContext) {
1020        const SERVER_1_ID: &str = "mcp-1";
1021        const SERVER_2_ID: &str = "mcp-2";
1022
1023        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1024
1025        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1026        let store = cx.new(|cx| {
1027            ContextServerStore::test(
1028                registry.clone(),
1029                project.read(cx).worktree_store(),
1030                Some(project.downgrade()),
1031                cx,
1032            )
1033        });
1034
1035        let server_1_id = ContextServerId(SERVER_1_ID.into());
1036        let server_2_id = ContextServerId(SERVER_2_ID.into());
1037
1038        let server_1 = Arc::new(ContextServer::new(
1039            server_1_id.clone(),
1040            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
1041        ));
1042        let server_2 = Arc::new(ContextServer::new(
1043            server_2_id.clone(),
1044            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
1045        ));
1046
1047        let _server_events = assert_server_events(
1048            &store,
1049            vec![
1050                (server_1_id.clone(), ContextServerStatus::Starting),
1051                (server_1_id, ContextServerStatus::Running),
1052                (server_2_id.clone(), ContextServerStatus::Starting),
1053                (server_2_id.clone(), ContextServerStatus::Running),
1054                (server_2_id.clone(), ContextServerStatus::Stopped),
1055            ],
1056            cx,
1057        );
1058
1059        store.update(cx, |store, cx| store.test_start_server(server_1, cx));
1060
1061        cx.run_until_parked();
1062
1063        store.update(cx, |store, cx| {
1064            store.test_start_server(server_2.clone(), cx)
1065        });
1066
1067        cx.run_until_parked();
1068
1069        store
1070            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
1071            .unwrap();
1072    }
1073
1074    #[gpui::test(iterations = 25)]
1075    async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
1076        const SERVER_1_ID: &str = "mcp-1";
1077
1078        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1079
1080        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1081        let store = cx.new(|cx| {
1082            ContextServerStore::test(
1083                registry.clone(),
1084                project.read(cx).worktree_store(),
1085                Some(project.downgrade()),
1086                cx,
1087            )
1088        });
1089
1090        let server_id = ContextServerId(SERVER_1_ID.into());
1091
1092        let server_with_same_id_1 = Arc::new(ContextServer::new(
1093            server_id.clone(),
1094            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
1095        ));
1096        let server_with_same_id_2 = Arc::new(ContextServer::new(
1097            server_id.clone(),
1098            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
1099        ));
1100
1101        // If we start another server with the same id, we should report that we stopped the previous one
1102        let _server_events = assert_server_events(
1103            &store,
1104            vec![
1105                (server_id.clone(), ContextServerStatus::Starting),
1106                (server_id.clone(), ContextServerStatus::Stopped),
1107                (server_id.clone(), ContextServerStatus::Starting),
1108                (server_id.clone(), ContextServerStatus::Running),
1109            ],
1110            cx,
1111        );
1112
1113        store.update(cx, |store, cx| {
1114            store.test_start_server(server_with_same_id_1.clone(), cx)
1115        });
1116        store.update(cx, |store, cx| {
1117            store.test_start_server(server_with_same_id_2.clone(), cx)
1118        });
1119
1120        cx.run_until_parked();
1121
1122        cx.update(|cx| {
1123            assert_eq!(
1124                store.read(cx).status_for_server(&server_id),
1125                Some(ContextServerStatus::Running)
1126            );
1127        });
1128    }
1129
1130    #[gpui::test]
1131    async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
1132        const SERVER_1_ID: &str = "mcp-1";
1133        const SERVER_2_ID: &str = "mcp-2";
1134
1135        let server_1_id = ContextServerId(SERVER_1_ID.into());
1136        let server_2_id = ContextServerId(SERVER_2_ID.into());
1137
1138        let fake_descriptor_1 = Arc::new(FakeContextServerDescriptor::new(SERVER_1_ID));
1139
1140        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1141
1142        let executor = cx.executor();
1143        let store = project.read_with(cx, |project, _| project.context_server_store());
1144        store.update(cx, |store, cx| {
1145            store.set_context_server_factory(Box::new(move |id, _| {
1146                Arc::new(ContextServer::new(
1147                    id.clone(),
1148                    Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1149                ))
1150            }));
1151            store.registry().update(cx, |registry, cx| {
1152                registry.register_context_server_descriptor(
1153                    SERVER_1_ID.into(),
1154                    fake_descriptor_1,
1155                    cx,
1156                );
1157            });
1158        });
1159
1160        set_context_server_configuration(
1161            vec![(
1162                server_1_id.0.clone(),
1163                settings::ContextServerSettingsContent::Extension {
1164                    enabled: true,
1165                    remote: false,
1166                    settings: json!({
1167                        "somevalue": true
1168                    }),
1169                },
1170            )],
1171            cx,
1172        );
1173
1174        // Ensure that mcp-1 starts up
1175        {
1176            let _server_events = assert_server_events(
1177                &store,
1178                vec![
1179                    (server_1_id.clone(), ContextServerStatus::Starting),
1180                    (server_1_id.clone(), ContextServerStatus::Running),
1181                ],
1182                cx,
1183            );
1184            cx.run_until_parked();
1185        }
1186
1187        // Ensure that mcp-1 is restarted when the configuration was changed
1188        {
1189            let _server_events = assert_server_events(
1190                &store,
1191                vec![
1192                    (server_1_id.clone(), ContextServerStatus::Stopped),
1193                    (server_1_id.clone(), ContextServerStatus::Starting),
1194                    (server_1_id.clone(), ContextServerStatus::Running),
1195                ],
1196                cx,
1197            );
1198            set_context_server_configuration(
1199                vec![(
1200                    server_1_id.0.clone(),
1201                    settings::ContextServerSettingsContent::Extension {
1202                        enabled: true,
1203                        remote: false,
1204                        settings: json!({
1205                            "somevalue": false
1206                        }),
1207                    },
1208                )],
1209                cx,
1210            );
1211
1212            cx.run_until_parked();
1213        }
1214
1215        // Ensure that mcp-1 is not restarted when the configuration was not changed
1216        {
1217            let _server_events = assert_server_events(&store, vec![], cx);
1218            set_context_server_configuration(
1219                vec![(
1220                    server_1_id.0.clone(),
1221                    settings::ContextServerSettingsContent::Extension {
1222                        enabled: true,
1223                        remote: false,
1224                        settings: json!({
1225                            "somevalue": false
1226                        }),
1227                    },
1228                )],
1229                cx,
1230            );
1231
1232            cx.run_until_parked();
1233        }
1234
1235        // Ensure that mcp-2 is started once it is added to the settings
1236        {
1237            let _server_events = assert_server_events(
1238                &store,
1239                vec![
1240                    (server_2_id.clone(), ContextServerStatus::Starting),
1241                    (server_2_id.clone(), ContextServerStatus::Running),
1242                ],
1243                cx,
1244            );
1245            set_context_server_configuration(
1246                vec![
1247                    (
1248                        server_1_id.0.clone(),
1249                        settings::ContextServerSettingsContent::Extension {
1250                            enabled: true,
1251                            remote: false,
1252                            settings: json!({
1253                                "somevalue": false
1254                            }),
1255                        },
1256                    ),
1257                    (
1258                        server_2_id.0.clone(),
1259                        settings::ContextServerSettingsContent::Stdio {
1260                            enabled: true,
1261                            remote: false,
1262                            command: ContextServerCommand {
1263                                path: "somebinary".into(),
1264                                args: vec!["arg".to_string()],
1265                                env: None,
1266                                timeout: None,
1267                            },
1268                        },
1269                    ),
1270                ],
1271                cx,
1272            );
1273
1274            cx.run_until_parked();
1275        }
1276
1277        // Ensure that mcp-2 is restarted once the args have changed
1278        {
1279            let _server_events = assert_server_events(
1280                &store,
1281                vec![
1282                    (server_2_id.clone(), ContextServerStatus::Stopped),
1283                    (server_2_id.clone(), ContextServerStatus::Starting),
1284                    (server_2_id.clone(), ContextServerStatus::Running),
1285                ],
1286                cx,
1287            );
1288            set_context_server_configuration(
1289                vec![
1290                    (
1291                        server_1_id.0.clone(),
1292                        settings::ContextServerSettingsContent::Extension {
1293                            enabled: true,
1294                            remote: false,
1295                            settings: json!({
1296                                "somevalue": false
1297                            }),
1298                        },
1299                    ),
1300                    (
1301                        server_2_id.0.clone(),
1302                        settings::ContextServerSettingsContent::Stdio {
1303                            enabled: true,
1304                            remote: false,
1305                            command: ContextServerCommand {
1306                                path: "somebinary".into(),
1307                                args: vec!["anotherArg".to_string()],
1308                                env: None,
1309                                timeout: None,
1310                            },
1311                        },
1312                    ),
1313                ],
1314                cx,
1315            );
1316
1317            cx.run_until_parked();
1318        }
1319
1320        // Ensure that mcp-2 is removed once it is removed from the settings
1321        {
1322            let _server_events = assert_server_events(
1323                &store,
1324                vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
1325                cx,
1326            );
1327            set_context_server_configuration(
1328                vec![(
1329                    server_1_id.0.clone(),
1330                    settings::ContextServerSettingsContent::Extension {
1331                        enabled: true,
1332                        remote: false,
1333                        settings: json!({
1334                            "somevalue": false
1335                        }),
1336                    },
1337                )],
1338                cx,
1339            );
1340
1341            cx.run_until_parked();
1342
1343            cx.update(|cx| {
1344                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1345            });
1346        }
1347
1348        // Ensure that nothing happens if the settings do not change
1349        {
1350            let _server_events = assert_server_events(&store, vec![], cx);
1351            set_context_server_configuration(
1352                vec![(
1353                    server_1_id.0.clone(),
1354                    settings::ContextServerSettingsContent::Extension {
1355                        enabled: true,
1356                        remote: false,
1357                        settings: json!({
1358                            "somevalue": false
1359                        }),
1360                    },
1361                )],
1362                cx,
1363            );
1364
1365            cx.run_until_parked();
1366
1367            cx.update(|cx| {
1368                assert_eq!(
1369                    store.read(cx).status_for_server(&server_1_id),
1370                    Some(ContextServerStatus::Running)
1371                );
1372                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1373            });
1374        }
1375    }
1376
1377    #[gpui::test]
1378    async fn test_context_server_enabled_disabled(cx: &mut TestAppContext) {
1379        const SERVER_1_ID: &str = "mcp-1";
1380
1381        let server_1_id = ContextServerId(SERVER_1_ID.into());
1382
1383        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1384
1385        let executor = cx.executor();
1386        let store = project.read_with(cx, |project, _| project.context_server_store());
1387        store.update(cx, |store, _| {
1388            store.set_context_server_factory(Box::new(move |id, _| {
1389                Arc::new(ContextServer::new(
1390                    id.clone(),
1391                    Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1392                ))
1393            }));
1394        });
1395
1396        set_context_server_configuration(
1397            vec![(
1398                server_1_id.0.clone(),
1399                settings::ContextServerSettingsContent::Stdio {
1400                    enabled: true,
1401                    remote: false,
1402                    command: ContextServerCommand {
1403                        path: "somebinary".into(),
1404                        args: vec!["arg".to_string()],
1405                        env: None,
1406                        timeout: None,
1407                    },
1408                },
1409            )],
1410            cx,
1411        );
1412
1413        // Ensure that mcp-1 starts up
1414        {
1415            let _server_events = assert_server_events(
1416                &store,
1417                vec![
1418                    (server_1_id.clone(), ContextServerStatus::Starting),
1419                    (server_1_id.clone(), ContextServerStatus::Running),
1420                ],
1421                cx,
1422            );
1423            cx.run_until_parked();
1424        }
1425
1426        // Ensure that mcp-1 is stopped once it is disabled.
1427        {
1428            let _server_events = assert_server_events(
1429                &store,
1430                vec![(server_1_id.clone(), ContextServerStatus::Stopped)],
1431                cx,
1432            );
1433            set_context_server_configuration(
1434                vec![(
1435                    server_1_id.0.clone(),
1436                    settings::ContextServerSettingsContent::Stdio {
1437                        enabled: false,
1438                        remote: false,
1439                        command: ContextServerCommand {
1440                            path: "somebinary".into(),
1441                            args: vec!["arg".to_string()],
1442                            env: None,
1443                            timeout: None,
1444                        },
1445                    },
1446                )],
1447                cx,
1448            );
1449
1450            cx.run_until_parked();
1451        }
1452
1453        // Ensure that mcp-1 is started once it is enabled again.
1454        {
1455            let _server_events = assert_server_events(
1456                &store,
1457                vec![
1458                    (server_1_id.clone(), ContextServerStatus::Starting),
1459                    (server_1_id.clone(), ContextServerStatus::Running),
1460                ],
1461                cx,
1462            );
1463            set_context_server_configuration(
1464                vec![(
1465                    server_1_id.0.clone(),
1466                    settings::ContextServerSettingsContent::Stdio {
1467                        enabled: true,
1468                        remote: false,
1469                        command: ContextServerCommand {
1470                            path: "somebinary".into(),
1471                            args: vec!["arg".to_string()],
1472                            timeout: None,
1473                            env: None,
1474                        },
1475                    },
1476                )],
1477                cx,
1478            );
1479
1480            cx.run_until_parked();
1481        }
1482    }
1483
1484    fn set_context_server_configuration(
1485        context_servers: Vec<(Arc<str>, settings::ContextServerSettingsContent)>,
1486        cx: &mut TestAppContext,
1487    ) {
1488        cx.update(|cx| {
1489            SettingsStore::update_global(cx, |store, cx| {
1490                store.update_user_settings(cx, |content| {
1491                    content.project.context_servers.clear();
1492                    for (id, config) in context_servers {
1493                        content.project.context_servers.insert(id, config);
1494                    }
1495                });
1496            })
1497        });
1498    }
1499
1500    #[gpui::test]
1501    async fn test_remote_context_server(cx: &mut TestAppContext) {
1502        const SERVER_ID: &str = "remote-server";
1503        let server_id = ContextServerId(SERVER_ID.into());
1504        let server_url = "http://example.com/api";
1505
1506        let client = FakeHttpClient::create(|_| async move {
1507            use http_client::AsyncBody;
1508
1509            let response = Response::builder()
1510                .status(200)
1511                .header("Content-Type", "application/json")
1512                .body(AsyncBody::from(
1513                    serde_json::to_string(&json!({
1514                        "jsonrpc": "2.0",
1515                        "id": 0,
1516                        "result": {
1517                            "protocolVersion": "2024-11-05",
1518                            "capabilities": {},
1519                            "serverInfo": {
1520                                "name": "test-server",
1521                                "version": "1.0.0"
1522                            }
1523                        }
1524                    }))
1525                    .unwrap(),
1526                ))
1527                .unwrap();
1528            Ok(response)
1529        });
1530        cx.update(|cx| cx.set_http_client(client));
1531
1532        let (_fs, project) = setup_context_server_test(cx, json!({ "code.rs": "" }), vec![]).await;
1533
1534        let store = project.read_with(cx, |project, _| project.context_server_store());
1535
1536        set_context_server_configuration(
1537            vec![(
1538                server_id.0.clone(),
1539                settings::ContextServerSettingsContent::Http {
1540                    enabled: true,
1541                    url: server_url.to_string(),
1542                    headers: Default::default(),
1543                    timeout: None,
1544                },
1545            )],
1546            cx,
1547        );
1548
1549        let _server_events = assert_server_events(
1550            &store,
1551            vec![
1552                (server_id.clone(), ContextServerStatus::Starting),
1553                (server_id.clone(), ContextServerStatus::Running),
1554            ],
1555            cx,
1556        );
1557        cx.run_until_parked();
1558    }
1559
1560    struct ServerEvents {
1561        received_event_count: Rc<RefCell<usize>>,
1562        expected_event_count: usize,
1563        _subscription: Subscription,
1564    }
1565
1566    impl Drop for ServerEvents {
1567        fn drop(&mut self) {
1568            let actual_event_count = *self.received_event_count.borrow();
1569            assert_eq!(
1570                actual_event_count, self.expected_event_count,
1571                "
1572                Expected to receive {} context server store events, but received {} events",
1573                self.expected_event_count, actual_event_count
1574            );
1575        }
1576    }
1577
1578    #[gpui::test]
1579    async fn test_context_server_global_timeout(cx: &mut TestAppContext) {
1580        cx.update(|cx| {
1581            let settings_store = SettingsStore::test(cx);
1582            cx.set_global(settings_store);
1583            SettingsStore::update_global(cx, |store, cx| {
1584                store
1585                    .set_user_settings(r#"{"context_server_timeout": 90}"#, cx)
1586                    .expect("Failed to set test user settings");
1587            });
1588        });
1589
1590        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1591
1592        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1593        let store = cx.new(|cx| {
1594            ContextServerStore::test(
1595                registry.clone(),
1596                project.read(cx).worktree_store(),
1597                Some(project.downgrade()),
1598                cx,
1599            )
1600        });
1601
1602        let mut async_cx = cx.to_async();
1603        let result = ContextServerStore::create_context_server(
1604            store.downgrade(),
1605            ContextServerId("test-server".into()),
1606            Arc::new(ContextServerConfiguration::Http {
1607                url: url::Url::parse("http://localhost:8080").expect("Failed to parse test URL"),
1608                headers: Default::default(),
1609                timeout: None,
1610            }),
1611            &mut async_cx,
1612        )
1613        .await;
1614
1615        assert!(
1616            result.is_ok(),
1617            "Server should be created successfully with global timeout"
1618        );
1619    }
1620
1621    #[gpui::test]
1622    async fn test_context_server_per_server_timeout_override(cx: &mut TestAppContext) {
1623        const SERVER_ID: &str = "test-server";
1624
1625        cx.update(|cx| {
1626            let settings_store = SettingsStore::test(cx);
1627            cx.set_global(settings_store);
1628            SettingsStore::update_global(cx, |store, cx| {
1629                store
1630                    .set_user_settings(r#"{"context_server_timeout": 60}"#, cx)
1631                    .expect("Failed to set test user settings");
1632            });
1633        });
1634
1635        let (_fs, project) = setup_context_server_test(
1636            cx,
1637            json!({"code.rs": ""}),
1638            vec![(
1639                SERVER_ID.into(),
1640                ContextServerSettings::Http {
1641                    enabled: true,
1642                    url: "http://localhost:8080".to_string(),
1643                    headers: Default::default(),
1644                    timeout: Some(120),
1645                },
1646            )],
1647        )
1648        .await;
1649
1650        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1651        let store = cx.new(|cx| {
1652            ContextServerStore::test(
1653                registry.clone(),
1654                project.read(cx).worktree_store(),
1655                Some(project.downgrade()),
1656                cx,
1657            )
1658        });
1659
1660        let mut async_cx = cx.to_async();
1661        let result = ContextServerStore::create_context_server(
1662            store.downgrade(),
1663            ContextServerId("test-server".into()),
1664            Arc::new(ContextServerConfiguration::Http {
1665                url: url::Url::parse("http://localhost:8080").expect("Failed to parse test URL"),
1666                headers: Default::default(),
1667                timeout: Some(120),
1668            }),
1669            &mut async_cx,
1670        )
1671        .await;
1672
1673        assert!(
1674            result.is_ok(),
1675            "Server should be created successfully with per-server timeout override"
1676        );
1677    }
1678
1679    #[gpui::test]
1680    async fn test_context_server_stdio_timeout(cx: &mut TestAppContext) {
1681        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1682
1683        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1684        let store = cx.new(|cx| {
1685            ContextServerStore::test(
1686                registry.clone(),
1687                project.read(cx).worktree_store(),
1688                Some(project.downgrade()),
1689                cx,
1690            )
1691        });
1692
1693        let mut async_cx = cx.to_async();
1694        let result = ContextServerStore::create_context_server(
1695            store.downgrade(),
1696            ContextServerId("stdio-server".into()),
1697            Arc::new(ContextServerConfiguration::Custom {
1698                command: ContextServerCommand {
1699                    path: "/usr/bin/node".into(),
1700                    args: vec!["server.js".into()],
1701                    env: None,
1702                    timeout: Some(180000),
1703                },
1704                remote: false,
1705            }),
1706            &mut async_cx,
1707        )
1708        .await;
1709
1710        assert!(
1711            result.is_ok(),
1712            "Stdio server should be created successfully with timeout"
1713        );
1714    }
1715
1716    fn assert_server_events(
1717        store: &Entity<ContextServerStore>,
1718        expected_events: Vec<(ContextServerId, ContextServerStatus)>,
1719        cx: &mut TestAppContext,
1720    ) -> ServerEvents {
1721        cx.update(|cx| {
1722            let mut ix = 0;
1723            let received_event_count = Rc::new(RefCell::new(0));
1724            let expected_event_count = expected_events.len();
1725            let subscription = cx.subscribe(store, {
1726                let received_event_count = received_event_count.clone();
1727                move |_, event, _| match event {
1728                    Event::ServerStatusChanged {
1729                        server_id: actual_server_id,
1730                        status: actual_status,
1731                    } => {
1732                        let (expected_server_id, expected_status) = &expected_events[ix];
1733
1734                        assert_eq!(
1735                            actual_server_id, expected_server_id,
1736                            "Expected different server id at index {}",
1737                            ix
1738                        );
1739                        assert_eq!(
1740                            actual_status, expected_status,
1741                            "Expected different status at index {}",
1742                            ix
1743                        );
1744                        ix += 1;
1745                        *received_event_count.borrow_mut() += 1;
1746                    }
1747                }
1748            });
1749            ServerEvents {
1750                expected_event_count,
1751                received_event_count,
1752                _subscription: subscription,
1753            }
1754        })
1755    }
1756
1757    async fn setup_context_server_test(
1758        cx: &mut TestAppContext,
1759        files: serde_json::Value,
1760        context_server_configurations: Vec<(Arc<str>, ContextServerSettings)>,
1761    ) -> (Arc<FakeFs>, Entity<Project>) {
1762        cx.update(|cx| {
1763            let settings_store = SettingsStore::test(cx);
1764            cx.set_global(settings_store);
1765            let mut settings = ProjectSettings::get_global(cx).clone();
1766            for (id, config) in context_server_configurations {
1767                settings.context_servers.insert(id, config);
1768            }
1769            ProjectSettings::override_global(settings, cx);
1770        });
1771
1772        let fs = FakeFs::new(cx.executor());
1773        fs.insert_tree(path!("/test"), files).await;
1774        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
1775
1776        (fs, project)
1777    }
1778
1779    struct FakeContextServerDescriptor {
1780        path: PathBuf,
1781    }
1782
1783    impl FakeContextServerDescriptor {
1784        fn new(path: impl Into<PathBuf>) -> Self {
1785            Self { path: path.into() }
1786        }
1787    }
1788
1789    impl ContextServerDescriptor for FakeContextServerDescriptor {
1790        fn command(
1791            &self,
1792            _worktree_store: Entity<WorktreeStore>,
1793            _cx: &AsyncApp,
1794        ) -> Task<Result<ContextServerCommand>> {
1795            Task::ready(Ok(ContextServerCommand {
1796                path: self.path.clone(),
1797                args: vec!["arg1".to_string(), "arg2".to_string()],
1798                env: None,
1799                timeout: None,
1800            }))
1801        }
1802
1803        fn configuration(
1804            &self,
1805            _worktree_store: Entity<WorktreeStore>,
1806            _cx: &AsyncApp,
1807        ) -> Task<Result<Option<::extension::ContextServerConfiguration>>> {
1808            Task::ready(Ok(None))
1809        }
1810    }
1811}