context_server_store.rs

   1pub mod extension;
   2pub mod registry;
   3
   4use std::sync::Arc;
   5use std::time::Duration;
   6
   7use anyhow::{Context as _, Result};
   8use collections::{HashMap, HashSet};
   9use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  10use futures::{FutureExt as _, future::join_all};
  11use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
  12use registry::ContextServerDescriptorRegistry;
  13use settings::{Settings as _, SettingsStore};
  14use util::{ResultExt as _, rel_path::RelPath};
  15
  16use crate::{
  17    Project,
  18    project_settings::{ContextServerSettings, ProjectSettings},
  19    worktree_store::WorktreeStore,
  20};
  21
  22/// Maximum timeout for context server requests (10 minutes).
  23/// Prevents extremely large timeout values from tying up resources indefinitely.
  24const MAX_TIMEOUT_MS: u64 = 600_000;
  25
  26pub fn init(cx: &mut App) {
  27    extension::init(cx);
  28}
  29
  30actions!(
  31    context_server,
  32    [
  33        /// Restarts the context server.
  34        Restart
  35    ]
  36);
  37
  38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
  39pub enum ContextServerStatus {
  40    Starting,
  41    Running,
  42    Stopped,
  43    Error(Arc<str>),
  44}
  45
  46impl ContextServerStatus {
  47    fn from_state(state: &ContextServerState) -> Self {
  48        match state {
  49            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
  50            ContextServerState::Running { .. } => ContextServerStatus::Running,
  51            ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
  52            ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
  53        }
  54    }
  55}
  56
  57enum ContextServerState {
  58    Starting {
  59        server: Arc<ContextServer>,
  60        configuration: Arc<ContextServerConfiguration>,
  61        _task: Task<()>,
  62    },
  63    Running {
  64        server: Arc<ContextServer>,
  65        configuration: Arc<ContextServerConfiguration>,
  66    },
  67    Stopped {
  68        server: Arc<ContextServer>,
  69        configuration: Arc<ContextServerConfiguration>,
  70    },
  71    Error {
  72        server: Arc<ContextServer>,
  73        configuration: Arc<ContextServerConfiguration>,
  74        error: Arc<str>,
  75    },
  76}
  77
  78impl ContextServerState {
  79    pub fn server(&self) -> Arc<ContextServer> {
  80        match self {
  81            ContextServerState::Starting { server, .. } => server.clone(),
  82            ContextServerState::Running { server, .. } => server.clone(),
  83            ContextServerState::Stopped { server, .. } => server.clone(),
  84            ContextServerState::Error { server, .. } => server.clone(),
  85        }
  86    }
  87
  88    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
  89        match self {
  90            ContextServerState::Starting { configuration, .. } => configuration.clone(),
  91            ContextServerState::Running { configuration, .. } => configuration.clone(),
  92            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
  93            ContextServerState::Error { configuration, .. } => configuration.clone(),
  94        }
  95    }
  96}
  97
  98#[derive(Debug, PartialEq, Eq)]
  99pub enum ContextServerConfiguration {
 100    Custom {
 101        command: ContextServerCommand,
 102    },
 103    Extension {
 104        command: ContextServerCommand,
 105        settings: serde_json::Value,
 106    },
 107    Http {
 108        url: url::Url,
 109        headers: HashMap<String, String>,
 110        timeout: Option<u64>,
 111    },
 112}
 113
 114impl ContextServerConfiguration {
 115    pub fn command(&self) -> Option<&ContextServerCommand> {
 116        match self {
 117            ContextServerConfiguration::Custom { command } => Some(command),
 118            ContextServerConfiguration::Extension { command, .. } => Some(command),
 119            ContextServerConfiguration::Http { .. } => None,
 120        }
 121    }
 122
 123    pub async fn from_settings(
 124        settings: ContextServerSettings,
 125        id: ContextServerId,
 126        registry: Entity<ContextServerDescriptorRegistry>,
 127        worktree_store: Entity<WorktreeStore>,
 128        cx: &AsyncApp,
 129    ) -> Option<Self> {
 130        match settings {
 131            ContextServerSettings::Stdio {
 132                enabled: _,
 133                command,
 134            } => Some(ContextServerConfiguration::Custom { command }),
 135            ContextServerSettings::Extension {
 136                enabled: _,
 137                settings,
 138            } => {
 139                let descriptor = cx
 140                    .update(|cx| registry.read(cx).context_server_descriptor(&id.0))
 141                    .ok()
 142                    .flatten()?;
 143
 144                match descriptor.command(worktree_store, cx).await {
 145                    Ok(command) => {
 146                        Some(ContextServerConfiguration::Extension { command, settings })
 147                    }
 148                    Err(e) => {
 149                        log::error!(
 150                            "Failed to create context server configuration from settings: {e:#}"
 151                        );
 152                        None
 153                    }
 154                }
 155            }
 156            ContextServerSettings::Http {
 157                enabled: _,
 158                url,
 159                headers: auth,
 160                timeout,
 161            } => {
 162                let url = url::Url::parse(&url).log_err()?;
 163                Some(ContextServerConfiguration::Http {
 164                    url,
 165                    headers: auth,
 166                    timeout,
 167                })
 168            }
 169        }
 170    }
 171}
 172
 173pub type ContextServerFactory =
 174    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
 175
 176pub struct ContextServerStore {
 177    context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
 178    servers: HashMap<ContextServerId, ContextServerState>,
 179    worktree_store: Entity<WorktreeStore>,
 180    project: WeakEntity<Project>,
 181    registry: Entity<ContextServerDescriptorRegistry>,
 182    update_servers_task: Option<Task<Result<()>>>,
 183    context_server_factory: Option<ContextServerFactory>,
 184    needs_server_update: bool,
 185    _subscriptions: Vec<Subscription>,
 186}
 187
 188pub enum Event {
 189    ServerStatusChanged {
 190        server_id: ContextServerId,
 191        status: ContextServerStatus,
 192    },
 193}
 194
 195impl EventEmitter<Event> for ContextServerStore {}
 196
 197impl ContextServerStore {
 198    pub fn new(
 199        worktree_store: Entity<WorktreeStore>,
 200        weak_project: WeakEntity<Project>,
 201        cx: &mut Context<Self>,
 202    ) -> Self {
 203        Self::new_internal(
 204            true,
 205            None,
 206            ContextServerDescriptorRegistry::default_global(cx),
 207            worktree_store,
 208            weak_project,
 209            cx,
 210        )
 211    }
 212
 213    /// Returns all configured context server ids, excluding the ones that are disabled
 214    pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
 215        self.context_server_settings
 216            .iter()
 217            .filter(|(_, settings)| settings.enabled())
 218            .map(|(id, _)| ContextServerId(id.clone()))
 219            .collect()
 220    }
 221
 222    #[cfg(any(test, feature = "test-support"))]
 223    pub fn test(
 224        registry: Entity<ContextServerDescriptorRegistry>,
 225        worktree_store: Entity<WorktreeStore>,
 226        weak_project: WeakEntity<Project>,
 227        cx: &mut Context<Self>,
 228    ) -> Self {
 229        Self::new_internal(false, None, registry, worktree_store, weak_project, cx)
 230    }
 231
 232    #[cfg(any(test, feature = "test-support"))]
 233    pub fn test_maintain_server_loop(
 234        context_server_factory: Option<ContextServerFactory>,
 235        registry: Entity<ContextServerDescriptorRegistry>,
 236        worktree_store: Entity<WorktreeStore>,
 237        weak_project: WeakEntity<Project>,
 238        cx: &mut Context<Self>,
 239    ) -> Self {
 240        Self::new_internal(
 241            true,
 242            context_server_factory,
 243            registry,
 244            worktree_store,
 245            weak_project,
 246            cx,
 247        )
 248    }
 249
 250    fn new_internal(
 251        maintain_server_loop: bool,
 252        context_server_factory: Option<ContextServerFactory>,
 253        registry: Entity<ContextServerDescriptorRegistry>,
 254        worktree_store: Entity<WorktreeStore>,
 255        weak_project: WeakEntity<Project>,
 256        cx: &mut Context<Self>,
 257    ) -> Self {
 258        let subscriptions = if maintain_server_loop {
 259            vec![
 260                cx.observe(&registry, |this, _registry, cx| {
 261                    this.available_context_servers_changed(cx);
 262                }),
 263                cx.observe_global::<SettingsStore>(|this, cx| {
 264                    let settings = Self::resolve_context_server_settings(&this.worktree_store, cx);
 265                    if &this.context_server_settings == settings {
 266                        return;
 267                    }
 268                    this.context_server_settings = settings.clone();
 269                    this.available_context_servers_changed(cx);
 270                }),
 271            ]
 272        } else {
 273            Vec::new()
 274        };
 275
 276        let mut this = Self {
 277            _subscriptions: subscriptions,
 278            context_server_settings: Self::resolve_context_server_settings(&worktree_store, cx)
 279                .clone(),
 280            worktree_store,
 281            project: weak_project,
 282            registry,
 283            needs_server_update: false,
 284            servers: HashMap::default(),
 285            update_servers_task: None,
 286            context_server_factory,
 287        };
 288        if maintain_server_loop {
 289            this.available_context_servers_changed(cx);
 290        }
 291        this
 292    }
 293
 294    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 295        self.servers.get(id).map(|state| state.server())
 296    }
 297
 298    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 299        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
 300            Some(server.clone())
 301        } else {
 302            None
 303        }
 304    }
 305
 306    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
 307        self.servers.get(id).map(ContextServerStatus::from_state)
 308    }
 309
 310    pub fn configuration_for_server(
 311        &self,
 312        id: &ContextServerId,
 313    ) -> Option<Arc<ContextServerConfiguration>> {
 314        self.servers.get(id).map(|state| state.configuration())
 315    }
 316
 317    pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
 318        self.servers
 319            .keys()
 320            .cloned()
 321            .chain(
 322                self.registry
 323                    .read(cx)
 324                    .context_server_descriptors()
 325                    .into_iter()
 326                    .map(|(id, _)| ContextServerId(id)),
 327            )
 328            .collect()
 329    }
 330
 331    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
 332        self.servers
 333            .values()
 334            .filter_map(|state| {
 335                if let ContextServerState::Running { server, .. } = state {
 336                    Some(server.clone())
 337                } else {
 338                    None
 339                }
 340            })
 341            .collect()
 342    }
 343
 344    pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
 345        cx.spawn(async move |this, cx| {
 346            let this = this.upgrade().context("Context server store dropped")?;
 347            let settings = this
 348                .update(cx, |this, _| {
 349                    this.context_server_settings.get(&server.id().0).cloned()
 350                })
 351                .ok()
 352                .flatten()
 353                .context("Failed to get context server settings")?;
 354
 355            if !settings.enabled() {
 356                return Ok(());
 357            }
 358
 359            let (registry, worktree_store) = this.update(cx, |this, _| {
 360                (this.registry.clone(), this.worktree_store.clone())
 361            })?;
 362            let configuration = ContextServerConfiguration::from_settings(
 363                settings,
 364                server.id(),
 365                registry,
 366                worktree_store,
 367                cx,
 368            )
 369            .await
 370            .context("Failed to create context server configuration")?;
 371
 372            this.update(cx, |this, cx| {
 373                this.run_server(server, Arc::new(configuration), cx)
 374            })
 375        })
 376        .detach_and_log_err(cx);
 377    }
 378
 379    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 380        if matches!(
 381            self.servers.get(id),
 382            Some(ContextServerState::Stopped { .. })
 383        ) {
 384            return Ok(());
 385        }
 386
 387        let state = self
 388            .servers
 389            .remove(id)
 390            .context("Context server not found")?;
 391
 392        let server = state.server();
 393        let configuration = state.configuration();
 394        let mut result = Ok(());
 395        if let ContextServerState::Running { server, .. } = &state {
 396            result = server.stop();
 397        }
 398        drop(state);
 399
 400        self.update_server_state(
 401            id.clone(),
 402            ContextServerState::Stopped {
 403                configuration,
 404                server,
 405            },
 406            cx,
 407        );
 408
 409        result
 410    }
 411
 412    fn run_server(
 413        &mut self,
 414        server: Arc<ContextServer>,
 415        configuration: Arc<ContextServerConfiguration>,
 416        cx: &mut Context<Self>,
 417    ) {
 418        let id = server.id();
 419        if matches!(
 420            self.servers.get(&id),
 421            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
 422        ) {
 423            self.stop_server(&id, cx).log_err();
 424        }
 425        let task = cx.spawn({
 426            let id = server.id();
 427            let server = server.clone();
 428            let configuration = configuration.clone();
 429
 430            async move |this, cx| {
 431                match server.clone().start(cx).await {
 432                    Ok(_) => {
 433                        debug_assert!(server.client().is_some());
 434
 435                        this.update(cx, |this, cx| {
 436                            this.update_server_state(
 437                                id.clone(),
 438                                ContextServerState::Running {
 439                                    server,
 440                                    configuration,
 441                                },
 442                                cx,
 443                            )
 444                        })
 445                        .log_err()
 446                    }
 447                    Err(err) => {
 448                        log::error!("{} context server failed to start: {}", id, err);
 449                        this.update(cx, |this, cx| {
 450                            this.update_server_state(
 451                                id.clone(),
 452                                ContextServerState::Error {
 453                                    configuration,
 454                                    server,
 455                                    error: err.to_string().into(),
 456                                },
 457                                cx,
 458                            )
 459                        })
 460                        .log_err()
 461                    }
 462                };
 463            }
 464        });
 465
 466        self.update_server_state(
 467            id.clone(),
 468            ContextServerState::Starting {
 469                configuration,
 470                _task: task,
 471                server,
 472            },
 473            cx,
 474        );
 475    }
 476
 477    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 478        let state = self
 479            .servers
 480            .remove(id)
 481            .context("Context server not found")?;
 482        drop(state);
 483        cx.emit(Event::ServerStatusChanged {
 484            server_id: id.clone(),
 485            status: ContextServerStatus::Stopped,
 486        });
 487        Ok(())
 488    }
 489
 490    fn create_context_server(
 491        &self,
 492        id: ContextServerId,
 493        configuration: Arc<ContextServerConfiguration>,
 494        cx: &mut Context<Self>,
 495    ) -> Result<Arc<ContextServer>> {
 496        // Get global timeout from settings
 497        let global_timeout = ProjectSettings::get_global(cx).context_server_timeout;
 498
 499        if let Some(factory) = self.context_server_factory.as_ref() {
 500            return Ok(factory(id, configuration));
 501        }
 502
 503        match configuration.as_ref() {
 504            ContextServerConfiguration::Http {
 505                url,
 506                headers,
 507                timeout,
 508            } => {
 509                // Apply timeout precedence for HTTP servers: per-server > global
 510                // Cap at MAX_TIMEOUT_MS to prevent extremely large timeout values
 511                let resolved_timeout = timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_MS);
 512
 513                Ok(Arc::new(ContextServer::http(
 514                    id,
 515                    url,
 516                    headers.clone(),
 517                    cx.http_client(),
 518                    cx.background_executor().clone(),
 519                    Some(Duration::from_millis(resolved_timeout)),
 520                )?))
 521            }
 522            _ => {
 523                let root_path = self
 524                    .project
 525                    .read_with(cx, |project, cx| project.active_project_directory(cx))
 526                    .ok()
 527                    .flatten()
 528                    .or_else(|| {
 529                        self.worktree_store.read_with(cx, |store, cx| {
 530                            store.visible_worktrees(cx).fold(None, |acc, item| {
 531                                if acc.is_none() {
 532                                    item.read(cx).root_dir()
 533                                } else {
 534                                    acc
 535                                }
 536                            })
 537                        })
 538                    });
 539
 540                // Apply timeout precedence for stdio servers: per-server > global
 541                // Cap at MAX_TIMEOUT_MS to prevent extremely large timeout values
 542                let mut command_with_timeout = configuration
 543                    .command()
 544                    .context("Missing command configuration for stdio context server")?
 545                    .clone();
 546                if command_with_timeout.timeout.is_none() {
 547                    command_with_timeout.timeout = Some(global_timeout.min(MAX_TIMEOUT_MS));
 548                } else {
 549                    command_with_timeout.timeout =
 550                        command_with_timeout.timeout.map(|t| t.min(MAX_TIMEOUT_MS));
 551                }
 552
 553                Ok(Arc::new(ContextServer::stdio(
 554                    id,
 555                    command_with_timeout,
 556                    root_path,
 557                )))
 558            }
 559        }
 560    }
 561
 562    fn resolve_context_server_settings<'a>(
 563        worktree_store: &'a Entity<WorktreeStore>,
 564        cx: &'a App,
 565    ) -> &'a HashMap<Arc<str>, ContextServerSettings> {
 566        let location = worktree_store
 567            .read(cx)
 568            .visible_worktrees(cx)
 569            .next()
 570            .map(|worktree| settings::SettingsLocation {
 571                worktree_id: worktree.read(cx).id(),
 572                path: RelPath::empty(),
 573            });
 574        &ProjectSettings::get(location, cx).context_servers
 575    }
 576
 577    fn update_server_state(
 578        &mut self,
 579        id: ContextServerId,
 580        state: ContextServerState,
 581        cx: &mut Context<Self>,
 582    ) {
 583        let status = ContextServerStatus::from_state(&state);
 584        self.servers.insert(id.clone(), state);
 585        cx.emit(Event::ServerStatusChanged {
 586            server_id: id,
 587            status,
 588        });
 589    }
 590
 591    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
 592        if self.update_servers_task.is_some() {
 593            self.needs_server_update = true;
 594        } else {
 595            self.needs_server_update = false;
 596            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
 597                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
 598                    log::error!("Error maintaining context servers: {}", err);
 599                }
 600
 601                this.update(cx, |this, cx| {
 602                    this.update_servers_task.take();
 603                    if this.needs_server_update {
 604                        this.available_context_servers_changed(cx);
 605                    }
 606                })?;
 607
 608                Ok(())
 609            }));
 610        }
 611    }
 612
 613    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
 614        let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
 615            (
 616                this.context_server_settings.clone(),
 617                this.registry.clone(),
 618                this.worktree_store.clone(),
 619            )
 620        })?;
 621
 622        for (id, _) in
 623            registry.read_with(cx, |registry, _| registry.context_server_descriptors())?
 624        {
 625            configured_servers
 626                .entry(id)
 627                .or_insert(ContextServerSettings::default_extension());
 628        }
 629
 630        let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
 631            configured_servers
 632                .into_iter()
 633                .partition(|(_, settings)| settings.enabled());
 634
 635        let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
 636            let id = ContextServerId(id);
 637            ContextServerConfiguration::from_settings(
 638                settings,
 639                id.clone(),
 640                registry.clone(),
 641                worktree_store.clone(),
 642                cx,
 643            )
 644            .map(|config| (id, config))
 645        }))
 646        .await
 647        .into_iter()
 648        .filter_map(|(id, config)| config.map(|config| (id, config)))
 649        .collect::<HashMap<_, _>>();
 650
 651        let mut servers_to_start = Vec::new();
 652        let mut servers_to_remove = HashSet::default();
 653        let mut servers_to_stop = HashSet::default();
 654
 655        this.update(cx, |this, cx| {
 656            for server_id in this.servers.keys() {
 657                // All servers that are not in desired_servers should be removed from the store.
 658                // This can happen if the user removed a server from the context server settings.
 659                if !configured_servers.contains_key(server_id) {
 660                    if disabled_servers.contains_key(&server_id.0) {
 661                        servers_to_stop.insert(server_id.clone());
 662                    } else {
 663                        servers_to_remove.insert(server_id.clone());
 664                    }
 665                }
 666            }
 667
 668            for (id, config) in configured_servers {
 669                let state = this.servers.get(&id);
 670                let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
 671                let existing_config = state.as_ref().map(|state| state.configuration());
 672                if existing_config.as_deref() != Some(&config) || is_stopped {
 673                    let config = Arc::new(config);
 674                    let server = this.create_context_server(id.clone(), config.clone(), cx)?;
 675                    servers_to_start.push((server, config));
 676                    if this.servers.contains_key(&id) {
 677                        servers_to_stop.insert(id);
 678                    }
 679                }
 680            }
 681
 682            anyhow::Ok(())
 683        })??;
 684
 685        this.update(cx, |this, cx| {
 686            for id in servers_to_stop {
 687                this.stop_server(&id, cx)?;
 688            }
 689            for id in servers_to_remove {
 690                this.remove_server(&id, cx)?;
 691            }
 692            for (server, config) in servers_to_start {
 693                this.run_server(server, config, cx);
 694            }
 695            anyhow::Ok(())
 696        })?
 697    }
 698}
 699
 700#[cfg(test)]
 701mod tests {
 702    use super::*;
 703    use crate::{
 704        FakeFs, Project, context_server_store::registry::ContextServerDescriptor,
 705        project_settings::ProjectSettings,
 706    };
 707    use context_server::test::create_fake_transport;
 708    use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
 709    use http_client::{FakeHttpClient, Response};
 710    use serde_json::json;
 711    use std::{cell::RefCell, path::PathBuf, rc::Rc};
 712    use util::path;
 713
 714    #[gpui::test]
 715    async fn test_context_server_status(cx: &mut TestAppContext) {
 716        const SERVER_1_ID: &str = "mcp-1";
 717        const SERVER_2_ID: &str = "mcp-2";
 718
 719        let (_fs, project) = setup_context_server_test(
 720            cx,
 721            json!({"code.rs": ""}),
 722            vec![
 723                (SERVER_1_ID.into(), dummy_server_settings()),
 724                (SERVER_2_ID.into(), dummy_server_settings()),
 725            ],
 726        )
 727        .await;
 728
 729        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 730        let store = cx.new(|cx| {
 731            ContextServerStore::test(
 732                registry.clone(),
 733                project.read(cx).worktree_store(),
 734                project.downgrade(),
 735                cx,
 736            )
 737        });
 738
 739        let server_1_id = ContextServerId(SERVER_1_ID.into());
 740        let server_2_id = ContextServerId(SERVER_2_ID.into());
 741
 742        let server_1 = Arc::new(ContextServer::new(
 743            server_1_id.clone(),
 744            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 745        ));
 746        let server_2 = Arc::new(ContextServer::new(
 747            server_2_id.clone(),
 748            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
 749        ));
 750
 751        store.update(cx, |store, cx| store.start_server(server_1, cx));
 752
 753        cx.run_until_parked();
 754
 755        cx.update(|cx| {
 756            assert_eq!(
 757                store.read(cx).status_for_server(&server_1_id),
 758                Some(ContextServerStatus::Running)
 759            );
 760            assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
 761        });
 762
 763        store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
 764
 765        cx.run_until_parked();
 766
 767        cx.update(|cx| {
 768            assert_eq!(
 769                store.read(cx).status_for_server(&server_1_id),
 770                Some(ContextServerStatus::Running)
 771            );
 772            assert_eq!(
 773                store.read(cx).status_for_server(&server_2_id),
 774                Some(ContextServerStatus::Running)
 775            );
 776        });
 777
 778        store
 779            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
 780            .unwrap();
 781
 782        cx.update(|cx| {
 783            assert_eq!(
 784                store.read(cx).status_for_server(&server_1_id),
 785                Some(ContextServerStatus::Running)
 786            );
 787            assert_eq!(
 788                store.read(cx).status_for_server(&server_2_id),
 789                Some(ContextServerStatus::Stopped)
 790            );
 791        });
 792    }
 793
 794    #[gpui::test]
 795    async fn test_context_server_status_events(cx: &mut TestAppContext) {
 796        const SERVER_1_ID: &str = "mcp-1";
 797        const SERVER_2_ID: &str = "mcp-2";
 798
 799        let (_fs, project) = setup_context_server_test(
 800            cx,
 801            json!({"code.rs": ""}),
 802            vec![
 803                (SERVER_1_ID.into(), dummy_server_settings()),
 804                (SERVER_2_ID.into(), dummy_server_settings()),
 805            ],
 806        )
 807        .await;
 808
 809        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 810        let store = cx.new(|cx| {
 811            ContextServerStore::test(
 812                registry.clone(),
 813                project.read(cx).worktree_store(),
 814                project.downgrade(),
 815                cx,
 816            )
 817        });
 818
 819        let server_1_id = ContextServerId(SERVER_1_ID.into());
 820        let server_2_id = ContextServerId(SERVER_2_ID.into());
 821
 822        let server_1 = Arc::new(ContextServer::new(
 823            server_1_id.clone(),
 824            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 825        ));
 826        let server_2 = Arc::new(ContextServer::new(
 827            server_2_id.clone(),
 828            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
 829        ));
 830
 831        let _server_events = assert_server_events(
 832            &store,
 833            vec![
 834                (server_1_id.clone(), ContextServerStatus::Starting),
 835                (server_1_id, ContextServerStatus::Running),
 836                (server_2_id.clone(), ContextServerStatus::Starting),
 837                (server_2_id.clone(), ContextServerStatus::Running),
 838                (server_2_id.clone(), ContextServerStatus::Stopped),
 839            ],
 840            cx,
 841        );
 842
 843        store.update(cx, |store, cx| store.start_server(server_1, cx));
 844
 845        cx.run_until_parked();
 846
 847        store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
 848
 849        cx.run_until_parked();
 850
 851        store
 852            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
 853            .unwrap();
 854    }
 855
 856    #[gpui::test(iterations = 25)]
 857    async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
 858        const SERVER_1_ID: &str = "mcp-1";
 859
 860        let (_fs, project) = setup_context_server_test(
 861            cx,
 862            json!({"code.rs": ""}),
 863            vec![(SERVER_1_ID.into(), dummy_server_settings())],
 864        )
 865        .await;
 866
 867        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 868        let store = cx.new(|cx| {
 869            ContextServerStore::test(
 870                registry.clone(),
 871                project.read(cx).worktree_store(),
 872                project.downgrade(),
 873                cx,
 874            )
 875        });
 876
 877        let server_id = ContextServerId(SERVER_1_ID.into());
 878
 879        let server_with_same_id_1 = Arc::new(ContextServer::new(
 880            server_id.clone(),
 881            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 882        ));
 883        let server_with_same_id_2 = Arc::new(ContextServer::new(
 884            server_id.clone(),
 885            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 886        ));
 887
 888        // If we start another server with the same id, we should report that we stopped the previous one
 889        let _server_events = assert_server_events(
 890            &store,
 891            vec![
 892                (server_id.clone(), ContextServerStatus::Starting),
 893                (server_id.clone(), ContextServerStatus::Stopped),
 894                (server_id.clone(), ContextServerStatus::Starting),
 895                (server_id.clone(), ContextServerStatus::Running),
 896            ],
 897            cx,
 898        );
 899
 900        store.update(cx, |store, cx| {
 901            store.start_server(server_with_same_id_1.clone(), cx)
 902        });
 903        store.update(cx, |store, cx| {
 904            store.start_server(server_with_same_id_2.clone(), cx)
 905        });
 906
 907        cx.run_until_parked();
 908
 909        cx.update(|cx| {
 910            assert_eq!(
 911                store.read(cx).status_for_server(&server_id),
 912                Some(ContextServerStatus::Running)
 913            );
 914        });
 915    }
 916
 917    #[gpui::test]
 918    async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
 919        const SERVER_1_ID: &str = "mcp-1";
 920        const SERVER_2_ID: &str = "mcp-2";
 921
 922        let server_1_id = ContextServerId(SERVER_1_ID.into());
 923        let server_2_id = ContextServerId(SERVER_2_ID.into());
 924
 925        let fake_descriptor_1 = Arc::new(FakeContextServerDescriptor::new(SERVER_1_ID));
 926
 927        let (_fs, project) = setup_context_server_test(
 928            cx,
 929            json!({"code.rs": ""}),
 930            vec![(
 931                SERVER_1_ID.into(),
 932                ContextServerSettings::Extension {
 933                    enabled: true,
 934                    settings: json!({
 935                        "somevalue": true
 936                    }),
 937                },
 938            )],
 939        )
 940        .await;
 941
 942        let executor = cx.executor();
 943        let registry = cx.new(|cx| {
 944            let mut registry = ContextServerDescriptorRegistry::new();
 945            registry.register_context_server_descriptor(SERVER_1_ID.into(), fake_descriptor_1, cx);
 946            registry
 947        });
 948        let store = cx.new(|cx| {
 949            ContextServerStore::test_maintain_server_loop(
 950                Some(Box::new(move |id, _| {
 951                    Arc::new(ContextServer::new(
 952                        id.clone(),
 953                        Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
 954                    ))
 955                })),
 956                registry.clone(),
 957                project.read(cx).worktree_store(),
 958                project.downgrade(),
 959                cx,
 960            )
 961        });
 962
 963        // Ensure that mcp-1 starts up
 964        {
 965            let _server_events = assert_server_events(
 966                &store,
 967                vec![
 968                    (server_1_id.clone(), ContextServerStatus::Starting),
 969                    (server_1_id.clone(), ContextServerStatus::Running),
 970                ],
 971                cx,
 972            );
 973            cx.run_until_parked();
 974        }
 975
 976        // Ensure that mcp-1 is restarted when the configuration was changed
 977        {
 978            let _server_events = assert_server_events(
 979                &store,
 980                vec![
 981                    (server_1_id.clone(), ContextServerStatus::Stopped),
 982                    (server_1_id.clone(), ContextServerStatus::Starting),
 983                    (server_1_id.clone(), ContextServerStatus::Running),
 984                ],
 985                cx,
 986            );
 987            set_context_server_configuration(
 988                vec![(
 989                    server_1_id.0.clone(),
 990                    settings::ContextServerSettingsContent::Extension {
 991                        enabled: true,
 992                        settings: json!({
 993                            "somevalue": false
 994                        }),
 995                    },
 996                )],
 997                cx,
 998            );
 999
1000            cx.run_until_parked();
1001        }
1002
1003        // Ensure that mcp-1 is not restarted when the configuration was not changed
1004        {
1005            let _server_events = assert_server_events(&store, vec![], cx);
1006            set_context_server_configuration(
1007                vec![(
1008                    server_1_id.0.clone(),
1009                    settings::ContextServerSettingsContent::Extension {
1010                        enabled: true,
1011                        settings: json!({
1012                            "somevalue": false
1013                        }),
1014                    },
1015                )],
1016                cx,
1017            );
1018
1019            cx.run_until_parked();
1020        }
1021
1022        // Ensure that mcp-2 is started once it is added to the settings
1023        {
1024            let _server_events = assert_server_events(
1025                &store,
1026                vec![
1027                    (server_2_id.clone(), ContextServerStatus::Starting),
1028                    (server_2_id.clone(), ContextServerStatus::Running),
1029                ],
1030                cx,
1031            );
1032            set_context_server_configuration(
1033                vec![
1034                    (
1035                        server_1_id.0.clone(),
1036                        settings::ContextServerSettingsContent::Extension {
1037                            enabled: true,
1038                            settings: json!({
1039                                "somevalue": false
1040                            }),
1041                        },
1042                    ),
1043                    (
1044                        server_2_id.0.clone(),
1045                        settings::ContextServerSettingsContent::Stdio {
1046                            enabled: true,
1047                            command: ContextServerCommand {
1048                                path: "somebinary".into(),
1049                                args: vec!["arg".to_string()],
1050                                env: None,
1051                                timeout: None,
1052                            },
1053                        },
1054                    ),
1055                ],
1056                cx,
1057            );
1058
1059            cx.run_until_parked();
1060        }
1061
1062        // Ensure that mcp-2 is restarted once the args have changed
1063        {
1064            let _server_events = assert_server_events(
1065                &store,
1066                vec![
1067                    (server_2_id.clone(), ContextServerStatus::Stopped),
1068                    (server_2_id.clone(), ContextServerStatus::Starting),
1069                    (server_2_id.clone(), ContextServerStatus::Running),
1070                ],
1071                cx,
1072            );
1073            set_context_server_configuration(
1074                vec![
1075                    (
1076                        server_1_id.0.clone(),
1077                        settings::ContextServerSettingsContent::Extension {
1078                            enabled: true,
1079                            settings: json!({
1080                                "somevalue": false
1081                            }),
1082                        },
1083                    ),
1084                    (
1085                        server_2_id.0.clone(),
1086                        settings::ContextServerSettingsContent::Stdio {
1087                            enabled: true,
1088                            command: ContextServerCommand {
1089                                path: "somebinary".into(),
1090                                args: vec!["anotherArg".to_string()],
1091                                env: None,
1092                                timeout: None,
1093                            },
1094                        },
1095                    ),
1096                ],
1097                cx,
1098            );
1099
1100            cx.run_until_parked();
1101        }
1102
1103        // Ensure that mcp-2 is removed once it is removed from the settings
1104        {
1105            let _server_events = assert_server_events(
1106                &store,
1107                vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
1108                cx,
1109            );
1110            set_context_server_configuration(
1111                vec![(
1112                    server_1_id.0.clone(),
1113                    settings::ContextServerSettingsContent::Extension {
1114                        enabled: true,
1115                        settings: json!({
1116                            "somevalue": false
1117                        }),
1118                    },
1119                )],
1120                cx,
1121            );
1122
1123            cx.run_until_parked();
1124
1125            cx.update(|cx| {
1126                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1127            });
1128        }
1129
1130        // Ensure that nothing happens if the settings do not change
1131        {
1132            let _server_events = assert_server_events(&store, vec![], cx);
1133            set_context_server_configuration(
1134                vec![(
1135                    server_1_id.0.clone(),
1136                    settings::ContextServerSettingsContent::Extension {
1137                        enabled: true,
1138                        settings: json!({
1139                            "somevalue": false
1140                        }),
1141                    },
1142                )],
1143                cx,
1144            );
1145
1146            cx.run_until_parked();
1147
1148            cx.update(|cx| {
1149                assert_eq!(
1150                    store.read(cx).status_for_server(&server_1_id),
1151                    Some(ContextServerStatus::Running)
1152                );
1153                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1154            });
1155        }
1156    }
1157
1158    #[gpui::test]
1159    async fn test_context_server_enabled_disabled(cx: &mut TestAppContext) {
1160        const SERVER_1_ID: &str = "mcp-1";
1161
1162        let server_1_id = ContextServerId(SERVER_1_ID.into());
1163
1164        let (_fs, project) = setup_context_server_test(
1165            cx,
1166            json!({"code.rs": ""}),
1167            vec![(
1168                SERVER_1_ID.into(),
1169                ContextServerSettings::Stdio {
1170                    enabled: true,
1171                    command: ContextServerCommand {
1172                        path: "somebinary".into(),
1173                        args: vec!["arg".to_string()],
1174                        env: None,
1175                        timeout: None,
1176                    },
1177                },
1178            )],
1179        )
1180        .await;
1181
1182        let executor = cx.executor();
1183        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1184        let store = cx.new(|cx| {
1185            ContextServerStore::test_maintain_server_loop(
1186                Some(Box::new(move |id, _| {
1187                    Arc::new(ContextServer::new(
1188                        id.clone(),
1189                        Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1190                    ))
1191                })),
1192                registry.clone(),
1193                project.read(cx).worktree_store(),
1194                project.downgrade(),
1195                cx,
1196            )
1197        });
1198
1199        // Ensure that mcp-1 starts up
1200        {
1201            let _server_events = assert_server_events(
1202                &store,
1203                vec![
1204                    (server_1_id.clone(), ContextServerStatus::Starting),
1205                    (server_1_id.clone(), ContextServerStatus::Running),
1206                ],
1207                cx,
1208            );
1209            cx.run_until_parked();
1210        }
1211
1212        // Ensure that mcp-1 is stopped once it is disabled.
1213        {
1214            let _server_events = assert_server_events(
1215                &store,
1216                vec![(server_1_id.clone(), ContextServerStatus::Stopped)],
1217                cx,
1218            );
1219            set_context_server_configuration(
1220                vec![(
1221                    server_1_id.0.clone(),
1222                    settings::ContextServerSettingsContent::Stdio {
1223                        enabled: false,
1224                        command: ContextServerCommand {
1225                            path: "somebinary".into(),
1226                            args: vec!["arg".to_string()],
1227                            env: None,
1228                            timeout: None,
1229                        },
1230                    },
1231                )],
1232                cx,
1233            );
1234
1235            cx.run_until_parked();
1236        }
1237
1238        // Ensure that mcp-1 is started once it is enabled again.
1239        {
1240            let _server_events = assert_server_events(
1241                &store,
1242                vec![
1243                    (server_1_id.clone(), ContextServerStatus::Starting),
1244                    (server_1_id.clone(), ContextServerStatus::Running),
1245                ],
1246                cx,
1247            );
1248            set_context_server_configuration(
1249                vec![(
1250                    server_1_id.0.clone(),
1251                    settings::ContextServerSettingsContent::Stdio {
1252                        enabled: true,
1253                        command: ContextServerCommand {
1254                            path: "somebinary".into(),
1255                            args: vec!["arg".to_string()],
1256                            timeout: None,
1257                            env: None,
1258                        },
1259                    },
1260                )],
1261                cx,
1262            );
1263
1264            cx.run_until_parked();
1265        }
1266    }
1267
1268    fn set_context_server_configuration(
1269        context_servers: Vec<(Arc<str>, settings::ContextServerSettingsContent)>,
1270        cx: &mut TestAppContext,
1271    ) {
1272        cx.update(|cx| {
1273            SettingsStore::update_global(cx, |store, cx| {
1274                store.update_user_settings(cx, |content| {
1275                    content.project.context_servers.clear();
1276                    for (id, config) in context_servers {
1277                        content.project.context_servers.insert(id, config);
1278                    }
1279                });
1280            })
1281        });
1282    }
1283
1284    #[gpui::test]
1285    async fn test_remote_context_server(cx: &mut TestAppContext) {
1286        const SERVER_ID: &str = "remote-server";
1287        let server_id = ContextServerId(SERVER_ID.into());
1288        let server_url = "http://example.com/api";
1289
1290        let (_fs, project) = setup_context_server_test(
1291            cx,
1292            json!({ "code.rs": "" }),
1293            vec![(
1294                SERVER_ID.into(),
1295                ContextServerSettings::Http {
1296                    enabled: true,
1297                    url: server_url.to_string(),
1298                    headers: Default::default(),
1299                    timeout: None,
1300                },
1301            )],
1302        )
1303        .await;
1304
1305        let client = FakeHttpClient::create(|_| async move {
1306            use http_client::AsyncBody;
1307
1308            let response = Response::builder()
1309                .status(200)
1310                .header("Content-Type", "application/json")
1311                .body(AsyncBody::from(
1312                    serde_json::to_string(&json!({
1313                        "jsonrpc": "2.0",
1314                        "id": 0,
1315                        "result": {
1316                            "protocolVersion": "2024-11-05",
1317                            "capabilities": {},
1318                            "serverInfo": {
1319                                "name": "test-server",
1320                                "version": "1.0.0"
1321                            }
1322                        }
1323                    }))
1324                    .unwrap(),
1325                ))
1326                .unwrap();
1327            Ok(response)
1328        });
1329        cx.update(|cx| cx.set_http_client(client));
1330        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1331        let store = cx.new(|cx| {
1332            ContextServerStore::test_maintain_server_loop(
1333                None,
1334                registry.clone(),
1335                project.read(cx).worktree_store(),
1336                project.downgrade(),
1337                cx,
1338            )
1339        });
1340
1341        let _server_events = assert_server_events(
1342            &store,
1343            vec![
1344                (server_id.clone(), ContextServerStatus::Starting),
1345                (server_id.clone(), ContextServerStatus::Running),
1346            ],
1347            cx,
1348        );
1349        cx.run_until_parked();
1350    }
1351
1352    struct ServerEvents {
1353        received_event_count: Rc<RefCell<usize>>,
1354        expected_event_count: usize,
1355        _subscription: Subscription,
1356    }
1357
1358    impl Drop for ServerEvents {
1359        fn drop(&mut self) {
1360            let actual_event_count = *self.received_event_count.borrow();
1361            assert_eq!(
1362                actual_event_count, self.expected_event_count,
1363                "
1364                Expected to receive {} context server store events, but received {} events",
1365                self.expected_event_count, actual_event_count
1366            );
1367        }
1368    }
1369
1370    #[gpui::test]
1371    async fn test_context_server_global_timeout(cx: &mut TestAppContext) {
1372        // Configure global timeout to 90 seconds
1373        cx.update(|cx| {
1374            let settings_store = SettingsStore::test(cx);
1375            cx.set_global(settings_store);
1376            SettingsStore::update_global(cx, |store, cx| {
1377                store
1378                    .set_user_settings(r#"{"context_server_timeout": 90000}"#, cx)
1379                    .expect("Failed to set test user settings");
1380            });
1381        });
1382
1383        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1384
1385        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1386        let store = cx.new(|cx| {
1387            ContextServerStore::test(
1388                registry.clone(),
1389                project.read(cx).worktree_store(),
1390                project.downgrade(),
1391                cx,
1392            )
1393        });
1394
1395        // Test that create_context_server applies global timeout
1396        let result = store.update(cx, |store, cx| {
1397            store.create_context_server(
1398                ContextServerId("test-server".into()),
1399                Arc::new(ContextServerConfiguration::Http {
1400                    url: url::Url::parse("http://localhost:8080")
1401                        .expect("Failed to parse test URL"),
1402                    headers: Default::default(),
1403                    timeout: None, // Should use global timeout of 90 seconds
1404                }),
1405                cx,
1406            )
1407        });
1408
1409        assert!(
1410            result.is_ok(),
1411            "Server should be created successfully with global timeout"
1412        );
1413    }
1414
1415    #[gpui::test]
1416    async fn test_context_server_per_server_timeout_override(cx: &mut TestAppContext) {
1417        const SERVER_ID: &str = "test-server";
1418
1419        // Configure global timeout to 60 seconds
1420        cx.update(|cx| {
1421            let settings_store = SettingsStore::test(cx);
1422            cx.set_global(settings_store);
1423            SettingsStore::update_global(cx, |store, cx| {
1424                store
1425                    .set_user_settings(r#"{"context_server_timeout": 60000}"#, cx)
1426                    .expect("Failed to set test user settings");
1427            });
1428        });
1429
1430        let (_fs, project) = setup_context_server_test(
1431            cx,
1432            json!({"code.rs": ""}),
1433            vec![(
1434                SERVER_ID.into(),
1435                ContextServerSettings::Http {
1436                    enabled: true,
1437                    url: "http://localhost:8080".to_string(),
1438                    headers: Default::default(),
1439                    timeout: Some(120000), // Override to 120 seconds
1440                },
1441            )],
1442        )
1443        .await;
1444
1445        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1446        let store = cx.new(|cx| {
1447            ContextServerStore::test(
1448                registry.clone(),
1449                project.read(cx).worktree_store(),
1450                project.downgrade(),
1451                cx,
1452            )
1453        });
1454
1455        // Test that create_context_server applies per-server timeout override
1456        let result = store.update(cx, |store, cx| {
1457            store.create_context_server(
1458                ContextServerId("test-server".into()),
1459                Arc::new(ContextServerConfiguration::Http {
1460                    url: url::Url::parse("http://localhost:8080")
1461                        .expect("Failed to parse test URL"),
1462                    headers: Default::default(),
1463                    timeout: Some(120000), // Override: should use 120 seconds, not global 60
1464                }),
1465                cx,
1466            )
1467        });
1468
1469        assert!(
1470            result.is_ok(),
1471            "Server should be created successfully with per-server timeout override"
1472        );
1473    }
1474
1475    #[gpui::test]
1476    async fn test_context_server_stdio_timeout(cx: &mut TestAppContext) {
1477        const SERVER_ID: &str = "stdio-server";
1478
1479        let (_fs, project) = setup_context_server_test(
1480            cx,
1481            json!({"code.rs": ""}),
1482            vec![(
1483                SERVER_ID.into(),
1484                ContextServerSettings::Stdio {
1485                    enabled: true,
1486                    command: ContextServerCommand {
1487                        path: "/usr/bin/node".into(),
1488                        args: vec!["server.js".into()],
1489                        env: None,
1490                        timeout: Some(180000), // 3 minutes
1491                    },
1492                },
1493            )],
1494        )
1495        .await;
1496
1497        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1498        let store = cx.new(|cx| {
1499            ContextServerStore::test(
1500                registry.clone(),
1501                project.read(cx).worktree_store(),
1502                project.downgrade(),
1503                cx,
1504            )
1505        });
1506
1507        // Test that create_context_server works with stdio timeout
1508        let result = store.update(cx, |store, cx| {
1509            store.create_context_server(
1510                ContextServerId("stdio-server".into()),
1511                Arc::new(ContextServerConfiguration::Custom {
1512                    command: ContextServerCommand {
1513                        path: "/usr/bin/node".into(),
1514                        args: vec!["server.js".into()],
1515                        env: None,
1516                        timeout: Some(180000), // 3 minutes
1517                    },
1518                }),
1519                cx,
1520            )
1521        });
1522
1523        assert!(
1524            result.is_ok(),
1525            "Stdio server should be created successfully with timeout"
1526        );
1527    }
1528
1529    fn dummy_server_settings() -> ContextServerSettings {
1530        ContextServerSettings::Stdio {
1531            enabled: true,
1532            command: ContextServerCommand {
1533                path: "somebinary".into(),
1534                args: vec!["arg".to_string()],
1535                env: None,
1536                timeout: None,
1537            },
1538        }
1539    }
1540
1541    fn assert_server_events(
1542        store: &Entity<ContextServerStore>,
1543        expected_events: Vec<(ContextServerId, ContextServerStatus)>,
1544        cx: &mut TestAppContext,
1545    ) -> ServerEvents {
1546        cx.update(|cx| {
1547            let mut ix = 0;
1548            let received_event_count = Rc::new(RefCell::new(0));
1549            let expected_event_count = expected_events.len();
1550            let subscription = cx.subscribe(store, {
1551                let received_event_count = received_event_count.clone();
1552                move |_, event, _| match event {
1553                    Event::ServerStatusChanged {
1554                        server_id: actual_server_id,
1555                        status: actual_status,
1556                    } => {
1557                        let (expected_server_id, expected_status) = &expected_events[ix];
1558
1559                        assert_eq!(
1560                            actual_server_id, expected_server_id,
1561                            "Expected different server id at index {}",
1562                            ix
1563                        );
1564                        assert_eq!(
1565                            actual_status, expected_status,
1566                            "Expected different status at index {}",
1567                            ix
1568                        );
1569                        ix += 1;
1570                        *received_event_count.borrow_mut() += 1;
1571                    }
1572                }
1573            });
1574            ServerEvents {
1575                expected_event_count,
1576                received_event_count,
1577                _subscription: subscription,
1578            }
1579        })
1580    }
1581
1582    async fn setup_context_server_test(
1583        cx: &mut TestAppContext,
1584        files: serde_json::Value,
1585        context_server_configurations: Vec<(Arc<str>, ContextServerSettings)>,
1586    ) -> (Arc<FakeFs>, Entity<Project>) {
1587        cx.update(|cx| {
1588            let settings_store = SettingsStore::test(cx);
1589            cx.set_global(settings_store);
1590            let mut settings = ProjectSettings::get_global(cx).clone();
1591            for (id, config) in context_server_configurations {
1592                settings.context_servers.insert(id, config);
1593            }
1594            ProjectSettings::override_global(settings, cx);
1595        });
1596
1597        let fs = FakeFs::new(cx.executor());
1598        fs.insert_tree(path!("/test"), files).await;
1599        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
1600
1601        (fs, project)
1602    }
1603
1604    struct FakeContextServerDescriptor {
1605        path: PathBuf,
1606    }
1607
1608    impl FakeContextServerDescriptor {
1609        fn new(path: impl Into<PathBuf>) -> Self {
1610            Self { path: path.into() }
1611        }
1612    }
1613
1614    impl ContextServerDescriptor for FakeContextServerDescriptor {
1615        fn command(
1616            &self,
1617            _worktree_store: Entity<WorktreeStore>,
1618            _cx: &AsyncApp,
1619        ) -> Task<Result<ContextServerCommand>> {
1620            Task::ready(Ok(ContextServerCommand {
1621                path: self.path.clone(),
1622                args: vec!["arg1".to_string(), "arg2".to_string()],
1623                env: None,
1624                timeout: None,
1625            }))
1626        }
1627
1628        fn configuration(
1629            &self,
1630            _worktree_store: Entity<WorktreeStore>,
1631            _cx: &AsyncApp,
1632        ) -> Task<Result<Option<::extension::ContextServerConfiguration>>> {
1633            Task::ready(Ok(None))
1634        }
1635    }
1636}