context_server_store.rs

   1pub mod extension;
   2pub mod registry;
   3
   4use std::sync::Arc;
   5use std::time::Duration;
   6
   7use anyhow::{Context as _, Result};
   8use collections::{HashMap, HashSet};
   9use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  10use futures::{FutureExt as _, future::join_all};
  11use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
  12use registry::ContextServerDescriptorRegistry;
  13use settings::{Settings as _, SettingsStore};
  14use util::{ResultExt as _, rel_path::RelPath};
  15
  16use crate::{
  17    Project,
  18    project_settings::{ContextServerSettings, ProjectSettings},
  19    worktree_store::WorktreeStore,
  20};
  21
  22/// Maximum timeout for context server requests
  23/// Prevents extremely large timeout values from tying up resources indefinitely.
  24const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
  25
  26pub fn init(cx: &mut App) {
  27    extension::init(cx);
  28}
  29
  30actions!(
  31    context_server,
  32    [
  33        /// Restarts the context server.
  34        Restart
  35    ]
  36);
  37
  38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
  39pub enum ContextServerStatus {
  40    Starting,
  41    Running,
  42    Stopped,
  43    Error(Arc<str>),
  44}
  45
  46impl ContextServerStatus {
  47    fn from_state(state: &ContextServerState) -> Self {
  48        match state {
  49            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
  50            ContextServerState::Running { .. } => ContextServerStatus::Running,
  51            ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
  52            ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
  53        }
  54    }
  55}
  56
  57enum ContextServerState {
  58    Starting {
  59        server: Arc<ContextServer>,
  60        configuration: Arc<ContextServerConfiguration>,
  61        _task: Task<()>,
  62    },
  63    Running {
  64        server: Arc<ContextServer>,
  65        configuration: Arc<ContextServerConfiguration>,
  66    },
  67    Stopped {
  68        server: Arc<ContextServer>,
  69        configuration: Arc<ContextServerConfiguration>,
  70    },
  71    Error {
  72        server: Arc<ContextServer>,
  73        configuration: Arc<ContextServerConfiguration>,
  74        error: Arc<str>,
  75    },
  76}
  77
  78impl ContextServerState {
  79    pub fn server(&self) -> Arc<ContextServer> {
  80        match self {
  81            ContextServerState::Starting { server, .. } => server.clone(),
  82            ContextServerState::Running { server, .. } => server.clone(),
  83            ContextServerState::Stopped { server, .. } => server.clone(),
  84            ContextServerState::Error { server, .. } => server.clone(),
  85        }
  86    }
  87
  88    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
  89        match self {
  90            ContextServerState::Starting { configuration, .. } => configuration.clone(),
  91            ContextServerState::Running { configuration, .. } => configuration.clone(),
  92            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
  93            ContextServerState::Error { configuration, .. } => configuration.clone(),
  94        }
  95    }
  96}
  97
  98#[derive(Debug, PartialEq, Eq)]
  99pub enum ContextServerConfiguration {
 100    Custom {
 101        command: ContextServerCommand,
 102    },
 103    Extension {
 104        command: ContextServerCommand,
 105        settings: serde_json::Value,
 106    },
 107    Http {
 108        url: url::Url,
 109        headers: HashMap<String, String>,
 110        timeout: Option<u64>,
 111    },
 112}
 113
 114impl ContextServerConfiguration {
 115    pub fn command(&self) -> Option<&ContextServerCommand> {
 116        match self {
 117            ContextServerConfiguration::Custom { command } => Some(command),
 118            ContextServerConfiguration::Extension { command, .. } => Some(command),
 119            ContextServerConfiguration::Http { .. } => None,
 120        }
 121    }
 122
 123    pub async fn from_settings(
 124        settings: ContextServerSettings,
 125        id: ContextServerId,
 126        registry: Entity<ContextServerDescriptorRegistry>,
 127        worktree_store: Entity<WorktreeStore>,
 128        cx: &AsyncApp,
 129    ) -> Option<Self> {
 130        match settings {
 131            ContextServerSettings::Stdio {
 132                enabled: _,
 133                command,
 134            } => Some(ContextServerConfiguration::Custom { command }),
 135            ContextServerSettings::Extension {
 136                enabled: _,
 137                settings,
 138            } => {
 139                let descriptor =
 140                    cx.update(|cx| registry.read(cx).context_server_descriptor(&id.0))?;
 141
 142                match descriptor.command(worktree_store, cx).await {
 143                    Ok(command) => {
 144                        Some(ContextServerConfiguration::Extension { command, settings })
 145                    }
 146                    Err(e) => {
 147                        log::error!(
 148                            "Failed to create context server configuration from settings: {e:#}"
 149                        );
 150                        None
 151                    }
 152                }
 153            }
 154            ContextServerSettings::Http {
 155                enabled: _,
 156                url,
 157                headers: auth,
 158                timeout,
 159            } => {
 160                let url = url::Url::parse(&url).log_err()?;
 161                Some(ContextServerConfiguration::Http {
 162                    url,
 163                    headers: auth,
 164                    timeout,
 165                })
 166            }
 167        }
 168    }
 169}
 170
 171pub type ContextServerFactory =
 172    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
 173
 174pub struct ContextServerStore {
 175    context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
 176    servers: HashMap<ContextServerId, ContextServerState>,
 177    worktree_store: Entity<WorktreeStore>,
 178    project: WeakEntity<Project>,
 179    registry: Entity<ContextServerDescriptorRegistry>,
 180    update_servers_task: Option<Task<Result<()>>>,
 181    context_server_factory: Option<ContextServerFactory>,
 182    needs_server_update: bool,
 183    _subscriptions: Vec<Subscription>,
 184}
 185
 186pub enum Event {
 187    ServerStatusChanged {
 188        server_id: ContextServerId,
 189        status: ContextServerStatus,
 190    },
 191}
 192
 193impl EventEmitter<Event> for ContextServerStore {}
 194
 195impl ContextServerStore {
 196    pub fn new(
 197        worktree_store: Entity<WorktreeStore>,
 198        weak_project: WeakEntity<Project>,
 199        cx: &mut Context<Self>,
 200    ) -> Self {
 201        Self::new_internal(
 202            true,
 203            None,
 204            ContextServerDescriptorRegistry::default_global(cx),
 205            worktree_store,
 206            weak_project,
 207            cx,
 208        )
 209    }
 210
 211    /// Returns all configured context server ids, excluding the ones that are disabled
 212    pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
 213        self.context_server_settings
 214            .iter()
 215            .filter(|(_, settings)| settings.enabled())
 216            .map(|(id, _)| ContextServerId(id.clone()))
 217            .collect()
 218    }
 219
 220    #[cfg(any(test, feature = "test-support"))]
 221    pub fn test(
 222        registry: Entity<ContextServerDescriptorRegistry>,
 223        worktree_store: Entity<WorktreeStore>,
 224        weak_project: WeakEntity<Project>,
 225        cx: &mut Context<Self>,
 226    ) -> Self {
 227        Self::new_internal(false, None, registry, worktree_store, weak_project, cx)
 228    }
 229
 230    #[cfg(any(test, feature = "test-support"))]
 231    pub fn test_maintain_server_loop(
 232        context_server_factory: Option<ContextServerFactory>,
 233        registry: Entity<ContextServerDescriptorRegistry>,
 234        worktree_store: Entity<WorktreeStore>,
 235        weak_project: WeakEntity<Project>,
 236        cx: &mut Context<Self>,
 237    ) -> Self {
 238        Self::new_internal(
 239            true,
 240            context_server_factory,
 241            registry,
 242            worktree_store,
 243            weak_project,
 244            cx,
 245        )
 246    }
 247
 248    fn new_internal(
 249        maintain_server_loop: bool,
 250        context_server_factory: Option<ContextServerFactory>,
 251        registry: Entity<ContextServerDescriptorRegistry>,
 252        worktree_store: Entity<WorktreeStore>,
 253        weak_project: WeakEntity<Project>,
 254        cx: &mut Context<Self>,
 255    ) -> Self {
 256        let subscriptions = if maintain_server_loop {
 257            vec![
 258                cx.observe(&registry, |this, _registry, cx| {
 259                    this.available_context_servers_changed(cx);
 260                }),
 261                cx.observe_global::<SettingsStore>(|this, cx| {
 262                    let settings =
 263                        &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
 264                    if &this.context_server_settings == settings {
 265                        return;
 266                    }
 267                    this.context_server_settings = settings.clone();
 268                    this.available_context_servers_changed(cx);
 269                }),
 270            ]
 271        } else {
 272            Vec::new()
 273        };
 274
 275        let mut this = Self {
 276            _subscriptions: subscriptions,
 277            context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
 278                .context_servers
 279                .clone(),
 280            worktree_store,
 281            project: weak_project,
 282            registry,
 283            needs_server_update: false,
 284            servers: HashMap::default(),
 285            update_servers_task: None,
 286            context_server_factory,
 287        };
 288        if maintain_server_loop {
 289            this.available_context_servers_changed(cx);
 290        }
 291        this
 292    }
 293
 294    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 295        self.servers.get(id).map(|state| state.server())
 296    }
 297
 298    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 299        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
 300            Some(server.clone())
 301        } else {
 302            None
 303        }
 304    }
 305
 306    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
 307        self.servers.get(id).map(ContextServerStatus::from_state)
 308    }
 309
 310    pub fn configuration_for_server(
 311        &self,
 312        id: &ContextServerId,
 313    ) -> Option<Arc<ContextServerConfiguration>> {
 314        self.servers.get(id).map(|state| state.configuration())
 315    }
 316
 317    pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
 318        self.servers
 319            .keys()
 320            .cloned()
 321            .chain(
 322                self.registry
 323                    .read(cx)
 324                    .context_server_descriptors()
 325                    .into_iter()
 326                    .map(|(id, _)| ContextServerId(id)),
 327            )
 328            .collect()
 329    }
 330
 331    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
 332        self.servers
 333            .values()
 334            .filter_map(|state| {
 335                if let ContextServerState::Running { server, .. } = state {
 336                    Some(server.clone())
 337                } else {
 338                    None
 339                }
 340            })
 341            .collect()
 342    }
 343
 344    pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
 345        cx.spawn(async move |this, cx| {
 346            let this = this.upgrade().context("Context server store dropped")?;
 347            let settings = this
 348                .update(cx, |this, _| {
 349                    this.context_server_settings.get(&server.id().0).cloned()
 350                })
 351                .context("Failed to get context server settings")?;
 352
 353            if !settings.enabled() {
 354                return anyhow::Ok(());
 355            }
 356
 357            let (registry, worktree_store) = this.update(cx, |this, _| {
 358                (this.registry.clone(), this.worktree_store.clone())
 359            });
 360            let configuration = ContextServerConfiguration::from_settings(
 361                settings,
 362                server.id(),
 363                registry,
 364                worktree_store,
 365                cx,
 366            )
 367            .await
 368            .context("Failed to create context server configuration")?;
 369
 370            this.update(cx, |this, cx| {
 371                this.run_server(server, Arc::new(configuration), cx)
 372            });
 373            Ok(())
 374        })
 375        .detach_and_log_err(cx);
 376    }
 377
 378    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 379        if matches!(
 380            self.servers.get(id),
 381            Some(ContextServerState::Stopped { .. })
 382        ) {
 383            return Ok(());
 384        }
 385
 386        let state = self
 387            .servers
 388            .remove(id)
 389            .context("Context server not found")?;
 390
 391        let server = state.server();
 392        let configuration = state.configuration();
 393        let mut result = Ok(());
 394        if let ContextServerState::Running { server, .. } = &state {
 395            result = server.stop();
 396        }
 397        drop(state);
 398
 399        self.update_server_state(
 400            id.clone(),
 401            ContextServerState::Stopped {
 402                configuration,
 403                server,
 404            },
 405            cx,
 406        );
 407
 408        result
 409    }
 410
 411    fn run_server(
 412        &mut self,
 413        server: Arc<ContextServer>,
 414        configuration: Arc<ContextServerConfiguration>,
 415        cx: &mut Context<Self>,
 416    ) {
 417        let id = server.id();
 418        if matches!(
 419            self.servers.get(&id),
 420            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
 421        ) {
 422            self.stop_server(&id, cx).log_err();
 423        }
 424        let task = cx.spawn({
 425            let id = server.id();
 426            let server = server.clone();
 427            let configuration = configuration.clone();
 428
 429            async move |this, cx| {
 430                match server.clone().start(cx).await {
 431                    Ok(_) => {
 432                        debug_assert!(server.client().is_some());
 433
 434                        this.update(cx, |this, cx| {
 435                            this.update_server_state(
 436                                id.clone(),
 437                                ContextServerState::Running {
 438                                    server,
 439                                    configuration,
 440                                },
 441                                cx,
 442                            )
 443                        })
 444                        .log_err()
 445                    }
 446                    Err(err) => {
 447                        log::error!("{} context server failed to start: {}", id, err);
 448                        this.update(cx, |this, cx| {
 449                            this.update_server_state(
 450                                id.clone(),
 451                                ContextServerState::Error {
 452                                    configuration,
 453                                    server,
 454                                    error: err.to_string().into(),
 455                                },
 456                                cx,
 457                            )
 458                        })
 459                        .log_err()
 460                    }
 461                };
 462            }
 463        });
 464
 465        self.update_server_state(
 466            id.clone(),
 467            ContextServerState::Starting {
 468                configuration,
 469                _task: task,
 470                server,
 471            },
 472            cx,
 473        );
 474    }
 475
 476    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 477        let state = self
 478            .servers
 479            .remove(id)
 480            .context("Context server not found")?;
 481        drop(state);
 482        cx.emit(Event::ServerStatusChanged {
 483            server_id: id.clone(),
 484            status: ContextServerStatus::Stopped,
 485        });
 486        Ok(())
 487    }
 488
 489    fn create_context_server(
 490        &self,
 491        id: ContextServerId,
 492        configuration: Arc<ContextServerConfiguration>,
 493        cx: &mut Context<Self>,
 494    ) -> Result<Arc<ContextServer>> {
 495        let global_timeout =
 496            Self::resolve_project_settings(&self.worktree_store, cx).context_server_timeout;
 497
 498        if let Some(factory) = self.context_server_factory.as_ref() {
 499            return Ok(factory(id, configuration));
 500        }
 501
 502        match configuration.as_ref() {
 503            ContextServerConfiguration::Http {
 504                url,
 505                headers,
 506                timeout,
 507            } => Ok(Arc::new(ContextServer::http(
 508                id,
 509                url,
 510                headers.clone(),
 511                cx.http_client(),
 512                cx.background_executor().clone(),
 513                Some(Duration::from_secs(
 514                    timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
 515                )),
 516            )?)),
 517            _ => {
 518                let root_path = self
 519                    .project
 520                    .read_with(cx, |project, cx| project.active_project_directory(cx))
 521                    .ok()
 522                    .flatten()
 523                    .or_else(|| {
 524                        self.worktree_store.read_with(cx, |store, cx| {
 525                            store.visible_worktrees(cx).fold(None, |acc, item| {
 526                                if acc.is_none() {
 527                                    item.read(cx).root_dir()
 528                                } else {
 529                                    acc
 530                                }
 531                            })
 532                        })
 533                    });
 534
 535                let mut command = configuration
 536                    .command()
 537                    .context("Missing command configuration for stdio context server")?
 538                    .clone();
 539                command.timeout = Some(
 540                    command
 541                        .timeout
 542                        .unwrap_or(global_timeout)
 543                        .min(MAX_TIMEOUT_SECS),
 544                );
 545
 546                Ok(Arc::new(ContextServer::stdio(id, command, root_path)))
 547            }
 548        }
 549    }
 550
 551    fn resolve_project_settings<'a>(
 552        worktree_store: &'a Entity<WorktreeStore>,
 553        cx: &'a App,
 554    ) -> &'a ProjectSettings {
 555        let location = worktree_store
 556            .read(cx)
 557            .visible_worktrees(cx)
 558            .next()
 559            .map(|worktree| settings::SettingsLocation {
 560                worktree_id: worktree.read(cx).id(),
 561                path: RelPath::empty(),
 562            });
 563        ProjectSettings::get(location, cx)
 564    }
 565
 566    fn update_server_state(
 567        &mut self,
 568        id: ContextServerId,
 569        state: ContextServerState,
 570        cx: &mut Context<Self>,
 571    ) {
 572        let status = ContextServerStatus::from_state(&state);
 573        self.servers.insert(id.clone(), state);
 574        cx.emit(Event::ServerStatusChanged {
 575            server_id: id,
 576            status,
 577        });
 578    }
 579
 580    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
 581        if self.update_servers_task.is_some() {
 582            self.needs_server_update = true;
 583        } else {
 584            self.needs_server_update = false;
 585            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
 586                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
 587                    log::error!("Error maintaining context servers: {}", err);
 588                }
 589
 590                this.update(cx, |this, cx| {
 591                    this.update_servers_task.take();
 592                    if this.needs_server_update {
 593                        this.available_context_servers_changed(cx);
 594                    }
 595                })?;
 596
 597                Ok(())
 598            }));
 599        }
 600    }
 601
 602    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
 603        let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
 604            (
 605                this.context_server_settings.clone(),
 606                this.registry.clone(),
 607                this.worktree_store.clone(),
 608            )
 609        })?;
 610
 611        for (id, _) in registry.read_with(cx, |registry, _| registry.context_server_descriptors()) {
 612            configured_servers
 613                .entry(id)
 614                .or_insert(ContextServerSettings::default_extension());
 615        }
 616
 617        let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
 618            configured_servers
 619                .into_iter()
 620                .partition(|(_, settings)| settings.enabled());
 621
 622        let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
 623            let id = ContextServerId(id);
 624            ContextServerConfiguration::from_settings(
 625                settings,
 626                id.clone(),
 627                registry.clone(),
 628                worktree_store.clone(),
 629                cx,
 630            )
 631            .map(|config| (id, config))
 632        }))
 633        .await
 634        .into_iter()
 635        .filter_map(|(id, config)| config.map(|config| (id, config)))
 636        .collect::<HashMap<_, _>>();
 637
 638        let mut servers_to_start = Vec::new();
 639        let mut servers_to_remove = HashSet::default();
 640        let mut servers_to_stop = HashSet::default();
 641
 642        this.update(cx, |this, cx| {
 643            for server_id in this.servers.keys() {
 644                // All servers that are not in desired_servers should be removed from the store.
 645                // This can happen if the user removed a server from the context server settings.
 646                if !configured_servers.contains_key(server_id) {
 647                    if disabled_servers.contains_key(&server_id.0) {
 648                        servers_to_stop.insert(server_id.clone());
 649                    } else {
 650                        servers_to_remove.insert(server_id.clone());
 651                    }
 652                }
 653            }
 654
 655            for (id, config) in configured_servers {
 656                let state = this.servers.get(&id);
 657                let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
 658                let existing_config = state.as_ref().map(|state| state.configuration());
 659                if existing_config.as_deref() != Some(&config) || is_stopped {
 660                    let config = Arc::new(config);
 661                    let server = this.create_context_server(id.clone(), config.clone(), cx)?;
 662                    servers_to_start.push((server, config));
 663                    if this.servers.contains_key(&id) {
 664                        servers_to_stop.insert(id);
 665                    }
 666                }
 667            }
 668
 669            anyhow::Ok(())
 670        })??;
 671
 672        this.update(cx, |this, cx| {
 673            for id in servers_to_stop {
 674                this.stop_server(&id, cx)?;
 675            }
 676            for id in servers_to_remove {
 677                this.remove_server(&id, cx)?;
 678            }
 679            for (server, config) in servers_to_start {
 680                this.run_server(server, config, cx);
 681            }
 682            anyhow::Ok(())
 683        })?
 684    }
 685}
 686
 687#[cfg(test)]
 688mod tests {
 689    use super::*;
 690    use crate::{
 691        FakeFs, Project, context_server_store::registry::ContextServerDescriptor,
 692        project_settings::ProjectSettings,
 693    };
 694    use context_server::test::create_fake_transport;
 695    use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
 696    use http_client::{FakeHttpClient, Response};
 697    use serde_json::json;
 698    use std::{cell::RefCell, path::PathBuf, rc::Rc};
 699    use util::path;
 700
 701    #[gpui::test]
 702    async fn test_context_server_status(cx: &mut TestAppContext) {
 703        const SERVER_1_ID: &str = "mcp-1";
 704        const SERVER_2_ID: &str = "mcp-2";
 705
 706        let (_fs, project) = setup_context_server_test(
 707            cx,
 708            json!({"code.rs": ""}),
 709            vec![
 710                (SERVER_1_ID.into(), dummy_server_settings()),
 711                (SERVER_2_ID.into(), dummy_server_settings()),
 712            ],
 713        )
 714        .await;
 715
 716        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 717        let store = cx.new(|cx| {
 718            ContextServerStore::test(
 719                registry.clone(),
 720                project.read(cx).worktree_store(),
 721                project.downgrade(),
 722                cx,
 723            )
 724        });
 725
 726        let server_1_id = ContextServerId(SERVER_1_ID.into());
 727        let server_2_id = ContextServerId(SERVER_2_ID.into());
 728
 729        let server_1 = Arc::new(ContextServer::new(
 730            server_1_id.clone(),
 731            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 732        ));
 733        let server_2 = Arc::new(ContextServer::new(
 734            server_2_id.clone(),
 735            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
 736        ));
 737
 738        store.update(cx, |store, cx| store.start_server(server_1, cx));
 739
 740        cx.run_until_parked();
 741
 742        cx.update(|cx| {
 743            assert_eq!(
 744                store.read(cx).status_for_server(&server_1_id),
 745                Some(ContextServerStatus::Running)
 746            );
 747            assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
 748        });
 749
 750        store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
 751
 752        cx.run_until_parked();
 753
 754        cx.update(|cx| {
 755            assert_eq!(
 756                store.read(cx).status_for_server(&server_1_id),
 757                Some(ContextServerStatus::Running)
 758            );
 759            assert_eq!(
 760                store.read(cx).status_for_server(&server_2_id),
 761                Some(ContextServerStatus::Running)
 762            );
 763        });
 764
 765        store
 766            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
 767            .unwrap();
 768
 769        cx.update(|cx| {
 770            assert_eq!(
 771                store.read(cx).status_for_server(&server_1_id),
 772                Some(ContextServerStatus::Running)
 773            );
 774            assert_eq!(
 775                store.read(cx).status_for_server(&server_2_id),
 776                Some(ContextServerStatus::Stopped)
 777            );
 778        });
 779    }
 780
 781    #[gpui::test]
 782    async fn test_context_server_status_events(cx: &mut TestAppContext) {
 783        const SERVER_1_ID: &str = "mcp-1";
 784        const SERVER_2_ID: &str = "mcp-2";
 785
 786        let (_fs, project) = setup_context_server_test(
 787            cx,
 788            json!({"code.rs": ""}),
 789            vec![
 790                (SERVER_1_ID.into(), dummy_server_settings()),
 791                (SERVER_2_ID.into(), dummy_server_settings()),
 792            ],
 793        )
 794        .await;
 795
 796        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 797        let store = cx.new(|cx| {
 798            ContextServerStore::test(
 799                registry.clone(),
 800                project.read(cx).worktree_store(),
 801                project.downgrade(),
 802                cx,
 803            )
 804        });
 805
 806        let server_1_id = ContextServerId(SERVER_1_ID.into());
 807        let server_2_id = ContextServerId(SERVER_2_ID.into());
 808
 809        let server_1 = Arc::new(ContextServer::new(
 810            server_1_id.clone(),
 811            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 812        ));
 813        let server_2 = Arc::new(ContextServer::new(
 814            server_2_id.clone(),
 815            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
 816        ));
 817
 818        let _server_events = assert_server_events(
 819            &store,
 820            vec![
 821                (server_1_id.clone(), ContextServerStatus::Starting),
 822                (server_1_id, ContextServerStatus::Running),
 823                (server_2_id.clone(), ContextServerStatus::Starting),
 824                (server_2_id.clone(), ContextServerStatus::Running),
 825                (server_2_id.clone(), ContextServerStatus::Stopped),
 826            ],
 827            cx,
 828        );
 829
 830        store.update(cx, |store, cx| store.start_server(server_1, cx));
 831
 832        cx.run_until_parked();
 833
 834        store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
 835
 836        cx.run_until_parked();
 837
 838        store
 839            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
 840            .unwrap();
 841    }
 842
 843    #[gpui::test(iterations = 25)]
 844    async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
 845        const SERVER_1_ID: &str = "mcp-1";
 846
 847        let (_fs, project) = setup_context_server_test(
 848            cx,
 849            json!({"code.rs": ""}),
 850            vec![(SERVER_1_ID.into(), dummy_server_settings())],
 851        )
 852        .await;
 853
 854        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 855        let store = cx.new(|cx| {
 856            ContextServerStore::test(
 857                registry.clone(),
 858                project.read(cx).worktree_store(),
 859                project.downgrade(),
 860                cx,
 861            )
 862        });
 863
 864        let server_id = ContextServerId(SERVER_1_ID.into());
 865
 866        let server_with_same_id_1 = Arc::new(ContextServer::new(
 867            server_id.clone(),
 868            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 869        ));
 870        let server_with_same_id_2 = Arc::new(ContextServer::new(
 871            server_id.clone(),
 872            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 873        ));
 874
 875        // If we start another server with the same id, we should report that we stopped the previous one
 876        let _server_events = assert_server_events(
 877            &store,
 878            vec![
 879                (server_id.clone(), ContextServerStatus::Starting),
 880                (server_id.clone(), ContextServerStatus::Stopped),
 881                (server_id.clone(), ContextServerStatus::Starting),
 882                (server_id.clone(), ContextServerStatus::Running),
 883            ],
 884            cx,
 885        );
 886
 887        store.update(cx, |store, cx| {
 888            store.start_server(server_with_same_id_1.clone(), cx)
 889        });
 890        store.update(cx, |store, cx| {
 891            store.start_server(server_with_same_id_2.clone(), cx)
 892        });
 893
 894        cx.run_until_parked();
 895
 896        cx.update(|cx| {
 897            assert_eq!(
 898                store.read(cx).status_for_server(&server_id),
 899                Some(ContextServerStatus::Running)
 900            );
 901        });
 902    }
 903
 904    #[gpui::test]
 905    async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
 906        const SERVER_1_ID: &str = "mcp-1";
 907        const SERVER_2_ID: &str = "mcp-2";
 908
 909        let server_1_id = ContextServerId(SERVER_1_ID.into());
 910        let server_2_id = ContextServerId(SERVER_2_ID.into());
 911
 912        let fake_descriptor_1 = Arc::new(FakeContextServerDescriptor::new(SERVER_1_ID));
 913
 914        let (_fs, project) = setup_context_server_test(
 915            cx,
 916            json!({"code.rs": ""}),
 917            vec![(
 918                SERVER_1_ID.into(),
 919                ContextServerSettings::Extension {
 920                    enabled: true,
 921                    settings: json!({
 922                        "somevalue": true
 923                    }),
 924                },
 925            )],
 926        )
 927        .await;
 928
 929        let executor = cx.executor();
 930        let registry = cx.new(|cx| {
 931            let mut registry = ContextServerDescriptorRegistry::new();
 932            registry.register_context_server_descriptor(SERVER_1_ID.into(), fake_descriptor_1, cx);
 933            registry
 934        });
 935        let store = cx.new(|cx| {
 936            ContextServerStore::test_maintain_server_loop(
 937                Some(Box::new(move |id, _| {
 938                    Arc::new(ContextServer::new(
 939                        id.clone(),
 940                        Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
 941                    ))
 942                })),
 943                registry.clone(),
 944                project.read(cx).worktree_store(),
 945                project.downgrade(),
 946                cx,
 947            )
 948        });
 949
 950        // Ensure that mcp-1 starts up
 951        {
 952            let _server_events = assert_server_events(
 953                &store,
 954                vec![
 955                    (server_1_id.clone(), ContextServerStatus::Starting),
 956                    (server_1_id.clone(), ContextServerStatus::Running),
 957                ],
 958                cx,
 959            );
 960            cx.run_until_parked();
 961        }
 962
 963        // Ensure that mcp-1 is restarted when the configuration was changed
 964        {
 965            let _server_events = assert_server_events(
 966                &store,
 967                vec![
 968                    (server_1_id.clone(), ContextServerStatus::Stopped),
 969                    (server_1_id.clone(), ContextServerStatus::Starting),
 970                    (server_1_id.clone(), ContextServerStatus::Running),
 971                ],
 972                cx,
 973            );
 974            set_context_server_configuration(
 975                vec![(
 976                    server_1_id.0.clone(),
 977                    settings::ContextServerSettingsContent::Extension {
 978                        enabled: true,
 979                        settings: json!({
 980                            "somevalue": false
 981                        }),
 982                    },
 983                )],
 984                cx,
 985            );
 986
 987            cx.run_until_parked();
 988        }
 989
 990        // Ensure that mcp-1 is not restarted when the configuration was not changed
 991        {
 992            let _server_events = assert_server_events(&store, vec![], cx);
 993            set_context_server_configuration(
 994                vec![(
 995                    server_1_id.0.clone(),
 996                    settings::ContextServerSettingsContent::Extension {
 997                        enabled: true,
 998                        settings: json!({
 999                            "somevalue": false
1000                        }),
1001                    },
1002                )],
1003                cx,
1004            );
1005
1006            cx.run_until_parked();
1007        }
1008
1009        // Ensure that mcp-2 is started once it is added to the settings
1010        {
1011            let _server_events = assert_server_events(
1012                &store,
1013                vec![
1014                    (server_2_id.clone(), ContextServerStatus::Starting),
1015                    (server_2_id.clone(), ContextServerStatus::Running),
1016                ],
1017                cx,
1018            );
1019            set_context_server_configuration(
1020                vec![
1021                    (
1022                        server_1_id.0.clone(),
1023                        settings::ContextServerSettingsContent::Extension {
1024                            enabled: true,
1025                            settings: json!({
1026                                "somevalue": false
1027                            }),
1028                        },
1029                    ),
1030                    (
1031                        server_2_id.0.clone(),
1032                        settings::ContextServerSettingsContent::Stdio {
1033                            enabled: true,
1034                            command: ContextServerCommand {
1035                                path: "somebinary".into(),
1036                                args: vec!["arg".to_string()],
1037                                env: None,
1038                                timeout: None,
1039                            },
1040                        },
1041                    ),
1042                ],
1043                cx,
1044            );
1045
1046            cx.run_until_parked();
1047        }
1048
1049        // Ensure that mcp-2 is restarted once the args have changed
1050        {
1051            let _server_events = assert_server_events(
1052                &store,
1053                vec![
1054                    (server_2_id.clone(), ContextServerStatus::Stopped),
1055                    (server_2_id.clone(), ContextServerStatus::Starting),
1056                    (server_2_id.clone(), ContextServerStatus::Running),
1057                ],
1058                cx,
1059            );
1060            set_context_server_configuration(
1061                vec![
1062                    (
1063                        server_1_id.0.clone(),
1064                        settings::ContextServerSettingsContent::Extension {
1065                            enabled: true,
1066                            settings: json!({
1067                                "somevalue": false
1068                            }),
1069                        },
1070                    ),
1071                    (
1072                        server_2_id.0.clone(),
1073                        settings::ContextServerSettingsContent::Stdio {
1074                            enabled: true,
1075                            command: ContextServerCommand {
1076                                path: "somebinary".into(),
1077                                args: vec!["anotherArg".to_string()],
1078                                env: None,
1079                                timeout: None,
1080                            },
1081                        },
1082                    ),
1083                ],
1084                cx,
1085            );
1086
1087            cx.run_until_parked();
1088        }
1089
1090        // Ensure that mcp-2 is removed once it is removed from the settings
1091        {
1092            let _server_events = assert_server_events(
1093                &store,
1094                vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
1095                cx,
1096            );
1097            set_context_server_configuration(
1098                vec![(
1099                    server_1_id.0.clone(),
1100                    settings::ContextServerSettingsContent::Extension {
1101                        enabled: true,
1102                        settings: json!({
1103                            "somevalue": false
1104                        }),
1105                    },
1106                )],
1107                cx,
1108            );
1109
1110            cx.run_until_parked();
1111
1112            cx.update(|cx| {
1113                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1114            });
1115        }
1116
1117        // Ensure that nothing happens if the settings do not change
1118        {
1119            let _server_events = assert_server_events(&store, vec![], cx);
1120            set_context_server_configuration(
1121                vec![(
1122                    server_1_id.0.clone(),
1123                    settings::ContextServerSettingsContent::Extension {
1124                        enabled: true,
1125                        settings: json!({
1126                            "somevalue": false
1127                        }),
1128                    },
1129                )],
1130                cx,
1131            );
1132
1133            cx.run_until_parked();
1134
1135            cx.update(|cx| {
1136                assert_eq!(
1137                    store.read(cx).status_for_server(&server_1_id),
1138                    Some(ContextServerStatus::Running)
1139                );
1140                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1141            });
1142        }
1143    }
1144
1145    #[gpui::test]
1146    async fn test_context_server_enabled_disabled(cx: &mut TestAppContext) {
1147        const SERVER_1_ID: &str = "mcp-1";
1148
1149        let server_1_id = ContextServerId(SERVER_1_ID.into());
1150
1151        let (_fs, project) = setup_context_server_test(
1152            cx,
1153            json!({"code.rs": ""}),
1154            vec![(
1155                SERVER_1_ID.into(),
1156                ContextServerSettings::Stdio {
1157                    enabled: true,
1158                    command: ContextServerCommand {
1159                        path: "somebinary".into(),
1160                        args: vec!["arg".to_string()],
1161                        env: None,
1162                        timeout: None,
1163                    },
1164                },
1165            )],
1166        )
1167        .await;
1168
1169        let executor = cx.executor();
1170        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1171        let store = cx.new(|cx| {
1172            ContextServerStore::test_maintain_server_loop(
1173                Some(Box::new(move |id, _| {
1174                    Arc::new(ContextServer::new(
1175                        id.clone(),
1176                        Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1177                    ))
1178                })),
1179                registry.clone(),
1180                project.read(cx).worktree_store(),
1181                project.downgrade(),
1182                cx,
1183            )
1184        });
1185
1186        // Ensure that mcp-1 starts up
1187        {
1188            let _server_events = assert_server_events(
1189                &store,
1190                vec![
1191                    (server_1_id.clone(), ContextServerStatus::Starting),
1192                    (server_1_id.clone(), ContextServerStatus::Running),
1193                ],
1194                cx,
1195            );
1196            cx.run_until_parked();
1197        }
1198
1199        // Ensure that mcp-1 is stopped once it is disabled.
1200        {
1201            let _server_events = assert_server_events(
1202                &store,
1203                vec![(server_1_id.clone(), ContextServerStatus::Stopped)],
1204                cx,
1205            );
1206            set_context_server_configuration(
1207                vec![(
1208                    server_1_id.0.clone(),
1209                    settings::ContextServerSettingsContent::Stdio {
1210                        enabled: false,
1211                        command: ContextServerCommand {
1212                            path: "somebinary".into(),
1213                            args: vec!["arg".to_string()],
1214                            env: None,
1215                            timeout: None,
1216                        },
1217                    },
1218                )],
1219                cx,
1220            );
1221
1222            cx.run_until_parked();
1223        }
1224
1225        // Ensure that mcp-1 is started once it is enabled again.
1226        {
1227            let _server_events = assert_server_events(
1228                &store,
1229                vec![
1230                    (server_1_id.clone(), ContextServerStatus::Starting),
1231                    (server_1_id.clone(), ContextServerStatus::Running),
1232                ],
1233                cx,
1234            );
1235            set_context_server_configuration(
1236                vec![(
1237                    server_1_id.0.clone(),
1238                    settings::ContextServerSettingsContent::Stdio {
1239                        enabled: true,
1240                        command: ContextServerCommand {
1241                            path: "somebinary".into(),
1242                            args: vec!["arg".to_string()],
1243                            timeout: None,
1244                            env: None,
1245                        },
1246                    },
1247                )],
1248                cx,
1249            );
1250
1251            cx.run_until_parked();
1252        }
1253    }
1254
1255    fn set_context_server_configuration(
1256        context_servers: Vec<(Arc<str>, settings::ContextServerSettingsContent)>,
1257        cx: &mut TestAppContext,
1258    ) {
1259        cx.update(|cx| {
1260            SettingsStore::update_global(cx, |store, cx| {
1261                store.update_user_settings(cx, |content| {
1262                    content.project.context_servers.clear();
1263                    for (id, config) in context_servers {
1264                        content.project.context_servers.insert(id, config);
1265                    }
1266                });
1267            })
1268        });
1269    }
1270
1271    #[gpui::test]
1272    async fn test_remote_context_server(cx: &mut TestAppContext) {
1273        const SERVER_ID: &str = "remote-server";
1274        let server_id = ContextServerId(SERVER_ID.into());
1275        let server_url = "http://example.com/api";
1276
1277        let (_fs, project) = setup_context_server_test(
1278            cx,
1279            json!({ "code.rs": "" }),
1280            vec![(
1281                SERVER_ID.into(),
1282                ContextServerSettings::Http {
1283                    enabled: true,
1284                    url: server_url.to_string(),
1285                    headers: Default::default(),
1286                    timeout: None,
1287                },
1288            )],
1289        )
1290        .await;
1291
1292        let client = FakeHttpClient::create(|_| async move {
1293            use http_client::AsyncBody;
1294
1295            let response = Response::builder()
1296                .status(200)
1297                .header("Content-Type", "application/json")
1298                .body(AsyncBody::from(
1299                    serde_json::to_string(&json!({
1300                        "jsonrpc": "2.0",
1301                        "id": 0,
1302                        "result": {
1303                            "protocolVersion": "2024-11-05",
1304                            "capabilities": {},
1305                            "serverInfo": {
1306                                "name": "test-server",
1307                                "version": "1.0.0"
1308                            }
1309                        }
1310                    }))
1311                    .unwrap(),
1312                ))
1313                .unwrap();
1314            Ok(response)
1315        });
1316        cx.update(|cx| cx.set_http_client(client));
1317        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1318        let store = cx.new(|cx| {
1319            ContextServerStore::test_maintain_server_loop(
1320                None,
1321                registry.clone(),
1322                project.read(cx).worktree_store(),
1323                project.downgrade(),
1324                cx,
1325            )
1326        });
1327
1328        let _server_events = assert_server_events(
1329            &store,
1330            vec![
1331                (server_id.clone(), ContextServerStatus::Starting),
1332                (server_id.clone(), ContextServerStatus::Running),
1333            ],
1334            cx,
1335        );
1336        cx.run_until_parked();
1337    }
1338
1339    struct ServerEvents {
1340        received_event_count: Rc<RefCell<usize>>,
1341        expected_event_count: usize,
1342        _subscription: Subscription,
1343    }
1344
1345    impl Drop for ServerEvents {
1346        fn drop(&mut self) {
1347            let actual_event_count = *self.received_event_count.borrow();
1348            assert_eq!(
1349                actual_event_count, self.expected_event_count,
1350                "
1351                Expected to receive {} context server store events, but received {} events",
1352                self.expected_event_count, actual_event_count
1353            );
1354        }
1355    }
1356
1357    #[gpui::test]
1358    async fn test_context_server_global_timeout(cx: &mut TestAppContext) {
1359        cx.update(|cx| {
1360            let settings_store = SettingsStore::test(cx);
1361            cx.set_global(settings_store);
1362            SettingsStore::update_global(cx, |store, cx| {
1363                store
1364                    .set_user_settings(r#"{"context_server_timeout": 90}"#, cx)
1365                    .expect("Failed to set test user settings");
1366            });
1367        });
1368
1369        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1370
1371        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1372        let store = cx.new(|cx| {
1373            ContextServerStore::test(
1374                registry.clone(),
1375                project.read(cx).worktree_store(),
1376                project.downgrade(),
1377                cx,
1378            )
1379        });
1380
1381        let result = store.update(cx, |store, cx| {
1382            store.create_context_server(
1383                ContextServerId("test-server".into()),
1384                Arc::new(ContextServerConfiguration::Http {
1385                    url: url::Url::parse("http://localhost:8080")
1386                        .expect("Failed to parse test URL"),
1387                    headers: Default::default(),
1388                    timeout: None,
1389                }),
1390                cx,
1391            )
1392        });
1393
1394        assert!(
1395            result.is_ok(),
1396            "Server should be created successfully with global timeout"
1397        );
1398    }
1399
1400    #[gpui::test]
1401    async fn test_context_server_per_server_timeout_override(cx: &mut TestAppContext) {
1402        const SERVER_ID: &str = "test-server";
1403
1404        cx.update(|cx| {
1405            let settings_store = SettingsStore::test(cx);
1406            cx.set_global(settings_store);
1407            SettingsStore::update_global(cx, |store, cx| {
1408                store
1409                    .set_user_settings(r#"{"context_server_timeout": 60}"#, cx)
1410                    .expect("Failed to set test user settings");
1411            });
1412        });
1413
1414        let (_fs, project) = setup_context_server_test(
1415            cx,
1416            json!({"code.rs": ""}),
1417            vec![(
1418                SERVER_ID.into(),
1419                ContextServerSettings::Http {
1420                    enabled: true,
1421                    url: "http://localhost:8080".to_string(),
1422                    headers: Default::default(),
1423                    timeout: Some(120),
1424                },
1425            )],
1426        )
1427        .await;
1428
1429        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1430        let store = cx.new(|cx| {
1431            ContextServerStore::test(
1432                registry.clone(),
1433                project.read(cx).worktree_store(),
1434                project.downgrade(),
1435                cx,
1436            )
1437        });
1438
1439        let result = store.update(cx, |store, cx| {
1440            store.create_context_server(
1441                ContextServerId("test-server".into()),
1442                Arc::new(ContextServerConfiguration::Http {
1443                    url: url::Url::parse("http://localhost:8080")
1444                        .expect("Failed to parse test URL"),
1445                    headers: Default::default(),
1446                    timeout: Some(120),
1447                }),
1448                cx,
1449            )
1450        });
1451
1452        assert!(
1453            result.is_ok(),
1454            "Server should be created successfully with per-server timeout override"
1455        );
1456    }
1457
1458    #[gpui::test]
1459    async fn test_context_server_stdio_timeout(cx: &mut TestAppContext) {
1460        const SERVER_ID: &str = "stdio-server";
1461
1462        let (_fs, project) = setup_context_server_test(
1463            cx,
1464            json!({"code.rs": ""}),
1465            vec![(
1466                SERVER_ID.into(),
1467                ContextServerSettings::Stdio {
1468                    enabled: true,
1469                    command: ContextServerCommand {
1470                        path: "/usr/bin/node".into(),
1471                        args: vec!["server.js".into()],
1472                        env: None,
1473                        timeout: Some(180000),
1474                    },
1475                },
1476            )],
1477        )
1478        .await;
1479
1480        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1481        let store = cx.new(|cx| {
1482            ContextServerStore::test(
1483                registry.clone(),
1484                project.read(cx).worktree_store(),
1485                project.downgrade(),
1486                cx,
1487            )
1488        });
1489
1490        let result = store.update(cx, |store, cx| {
1491            store.create_context_server(
1492                ContextServerId("stdio-server".into()),
1493                Arc::new(ContextServerConfiguration::Custom {
1494                    command: ContextServerCommand {
1495                        path: "/usr/bin/node".into(),
1496                        args: vec!["server.js".into()],
1497                        env: None,
1498                        timeout: Some(180000),
1499                    },
1500                }),
1501                cx,
1502            )
1503        });
1504
1505        assert!(
1506            result.is_ok(),
1507            "Stdio server should be created successfully with timeout"
1508        );
1509    }
1510
1511    fn dummy_server_settings() -> ContextServerSettings {
1512        ContextServerSettings::Stdio {
1513            enabled: true,
1514            command: ContextServerCommand {
1515                path: "somebinary".into(),
1516                args: vec!["arg".to_string()],
1517                env: None,
1518                timeout: None,
1519            },
1520        }
1521    }
1522
1523    fn assert_server_events(
1524        store: &Entity<ContextServerStore>,
1525        expected_events: Vec<(ContextServerId, ContextServerStatus)>,
1526        cx: &mut TestAppContext,
1527    ) -> ServerEvents {
1528        cx.update(|cx| {
1529            let mut ix = 0;
1530            let received_event_count = Rc::new(RefCell::new(0));
1531            let expected_event_count = expected_events.len();
1532            let subscription = cx.subscribe(store, {
1533                let received_event_count = received_event_count.clone();
1534                move |_, event, _| match event {
1535                    Event::ServerStatusChanged {
1536                        server_id: actual_server_id,
1537                        status: actual_status,
1538                    } => {
1539                        let (expected_server_id, expected_status) = &expected_events[ix];
1540
1541                        assert_eq!(
1542                            actual_server_id, expected_server_id,
1543                            "Expected different server id at index {}",
1544                            ix
1545                        );
1546                        assert_eq!(
1547                            actual_status, expected_status,
1548                            "Expected different status at index {}",
1549                            ix
1550                        );
1551                        ix += 1;
1552                        *received_event_count.borrow_mut() += 1;
1553                    }
1554                }
1555            });
1556            ServerEvents {
1557                expected_event_count,
1558                received_event_count,
1559                _subscription: subscription,
1560            }
1561        })
1562    }
1563
1564    async fn setup_context_server_test(
1565        cx: &mut TestAppContext,
1566        files: serde_json::Value,
1567        context_server_configurations: Vec<(Arc<str>, ContextServerSettings)>,
1568    ) -> (Arc<FakeFs>, Entity<Project>) {
1569        cx.update(|cx| {
1570            let settings_store = SettingsStore::test(cx);
1571            cx.set_global(settings_store);
1572            let mut settings = ProjectSettings::get_global(cx).clone();
1573            for (id, config) in context_server_configurations {
1574                settings.context_servers.insert(id, config);
1575            }
1576            ProjectSettings::override_global(settings, cx);
1577        });
1578
1579        let fs = FakeFs::new(cx.executor());
1580        fs.insert_tree(path!("/test"), files).await;
1581        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
1582
1583        (fs, project)
1584    }
1585
1586    struct FakeContextServerDescriptor {
1587        path: PathBuf,
1588    }
1589
1590    impl FakeContextServerDescriptor {
1591        fn new(path: impl Into<PathBuf>) -> Self {
1592            Self { path: path.into() }
1593        }
1594    }
1595
1596    impl ContextServerDescriptor for FakeContextServerDescriptor {
1597        fn command(
1598            &self,
1599            _worktree_store: Entity<WorktreeStore>,
1600            _cx: &AsyncApp,
1601        ) -> Task<Result<ContextServerCommand>> {
1602            Task::ready(Ok(ContextServerCommand {
1603                path: self.path.clone(),
1604                args: vec!["arg1".to_string(), "arg2".to_string()],
1605                env: None,
1606                timeout: None,
1607            }))
1608        }
1609
1610        fn configuration(
1611            &self,
1612            _worktree_store: Entity<WorktreeStore>,
1613            _cx: &AsyncApp,
1614        ) -> Task<Result<Option<::extension::ContextServerConfiguration>>> {
1615            Task::ready(Ok(None))
1616        }
1617    }
1618}