context_server_store.rs

   1pub mod extension;
   2pub mod registry;
   3
   4use std::sync::Arc;
   5use std::time::Duration;
   6
   7use anyhow::{Context as _, Result};
   8use collections::{HashMap, HashSet};
   9use context_server::{ContextServer, ContextServerCommand, ContextServerId};
  10use futures::{FutureExt as _, future::join_all};
  11use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
  12use registry::ContextServerDescriptorRegistry;
  13use settings::{Settings as _, SettingsStore};
  14use util::{ResultExt as _, rel_path::RelPath};
  15
  16use crate::{
  17    Project,
  18    project_settings::{ContextServerSettings, ProjectSettings},
  19    worktree_store::WorktreeStore,
  20};
  21
  22/// Maximum timeout for context server requests
  23/// Prevents extremely large timeout values from tying up resources indefinitely.
  24const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
  25
  26pub fn init(cx: &mut App) {
  27    extension::init(cx);
  28}
  29
  30actions!(
  31    context_server,
  32    [
  33        /// Restarts the context server.
  34        Restart
  35    ]
  36);
  37
  38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
  39pub enum ContextServerStatus {
  40    Starting,
  41    Running,
  42    Stopped,
  43    Error(Arc<str>),
  44}
  45
  46impl ContextServerStatus {
  47    fn from_state(state: &ContextServerState) -> Self {
  48        match state {
  49            ContextServerState::Starting { .. } => ContextServerStatus::Starting,
  50            ContextServerState::Running { .. } => ContextServerStatus::Running,
  51            ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
  52            ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
  53        }
  54    }
  55}
  56
  57enum ContextServerState {
  58    Starting {
  59        server: Arc<ContextServer>,
  60        configuration: Arc<ContextServerConfiguration>,
  61        _task: Task<()>,
  62    },
  63    Running {
  64        server: Arc<ContextServer>,
  65        configuration: Arc<ContextServerConfiguration>,
  66    },
  67    Stopped {
  68        server: Arc<ContextServer>,
  69        configuration: Arc<ContextServerConfiguration>,
  70    },
  71    Error {
  72        server: Arc<ContextServer>,
  73        configuration: Arc<ContextServerConfiguration>,
  74        error: Arc<str>,
  75    },
  76}
  77
  78impl ContextServerState {
  79    pub fn server(&self) -> Arc<ContextServer> {
  80        match self {
  81            ContextServerState::Starting { server, .. } => server.clone(),
  82            ContextServerState::Running { server, .. } => server.clone(),
  83            ContextServerState::Stopped { server, .. } => server.clone(),
  84            ContextServerState::Error { server, .. } => server.clone(),
  85        }
  86    }
  87
  88    pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
  89        match self {
  90            ContextServerState::Starting { configuration, .. } => configuration.clone(),
  91            ContextServerState::Running { configuration, .. } => configuration.clone(),
  92            ContextServerState::Stopped { configuration, .. } => configuration.clone(),
  93            ContextServerState::Error { configuration, .. } => configuration.clone(),
  94        }
  95    }
  96}
  97
  98#[derive(Debug, PartialEq, Eq)]
  99pub enum ContextServerConfiguration {
 100    Custom {
 101        command: ContextServerCommand,
 102    },
 103    Extension {
 104        command: ContextServerCommand,
 105        settings: serde_json::Value,
 106    },
 107    Http {
 108        url: url::Url,
 109        headers: HashMap<String, String>,
 110        timeout: Option<u64>,
 111    },
 112}
 113
 114impl ContextServerConfiguration {
 115    pub fn command(&self) -> Option<&ContextServerCommand> {
 116        match self {
 117            ContextServerConfiguration::Custom { command } => Some(command),
 118            ContextServerConfiguration::Extension { command, .. } => Some(command),
 119            ContextServerConfiguration::Http { .. } => None,
 120        }
 121    }
 122
 123    pub async fn from_settings(
 124        settings: ContextServerSettings,
 125        id: ContextServerId,
 126        registry: Entity<ContextServerDescriptorRegistry>,
 127        worktree_store: Entity<WorktreeStore>,
 128        cx: &AsyncApp,
 129    ) -> Option<Self> {
 130        match settings {
 131            ContextServerSettings::Stdio {
 132                enabled: _,
 133                command,
 134            } => Some(ContextServerConfiguration::Custom { command }),
 135            ContextServerSettings::Extension {
 136                enabled: _,
 137                settings,
 138            } => {
 139                let descriptor = cx
 140                    .update(|cx| registry.read(cx).context_server_descriptor(&id.0))
 141                    .ok()
 142                    .flatten()?;
 143
 144                match descriptor.command(worktree_store, cx).await {
 145                    Ok(command) => {
 146                        Some(ContextServerConfiguration::Extension { command, settings })
 147                    }
 148                    Err(e) => {
 149                        log::error!(
 150                            "Failed to create context server configuration from settings: {e:#}"
 151                        );
 152                        None
 153                    }
 154                }
 155            }
 156            ContextServerSettings::Http {
 157                enabled: _,
 158                url,
 159                headers: auth,
 160                timeout,
 161            } => {
 162                let url = url::Url::parse(&url).log_err()?;
 163                Some(ContextServerConfiguration::Http {
 164                    url,
 165                    headers: auth,
 166                    timeout,
 167                })
 168            }
 169        }
 170    }
 171}
 172
 173pub type ContextServerFactory =
 174    Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
 175
 176pub struct ContextServerStore {
 177    context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
 178    servers: HashMap<ContextServerId, ContextServerState>,
 179    worktree_store: Entity<WorktreeStore>,
 180    project: WeakEntity<Project>,
 181    registry: Entity<ContextServerDescriptorRegistry>,
 182    update_servers_task: Option<Task<Result<()>>>,
 183    context_server_factory: Option<ContextServerFactory>,
 184    needs_server_update: bool,
 185    _subscriptions: Vec<Subscription>,
 186}
 187
 188pub enum Event {
 189    ServerStatusChanged {
 190        server_id: ContextServerId,
 191        status: ContextServerStatus,
 192    },
 193}
 194
 195impl EventEmitter<Event> for ContextServerStore {}
 196
 197impl ContextServerStore {
 198    pub fn new(
 199        worktree_store: Entity<WorktreeStore>,
 200        weak_project: WeakEntity<Project>,
 201        cx: &mut Context<Self>,
 202    ) -> Self {
 203        Self::new_internal(
 204            true,
 205            None,
 206            ContextServerDescriptorRegistry::default_global(cx),
 207            worktree_store,
 208            weak_project,
 209            cx,
 210        )
 211    }
 212
 213    /// Returns all configured context server ids, excluding the ones that are disabled
 214    pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
 215        self.context_server_settings
 216            .iter()
 217            .filter(|(_, settings)| settings.enabled())
 218            .map(|(id, _)| ContextServerId(id.clone()))
 219            .collect()
 220    }
 221
 222    #[cfg(any(test, feature = "test-support"))]
 223    pub fn test(
 224        registry: Entity<ContextServerDescriptorRegistry>,
 225        worktree_store: Entity<WorktreeStore>,
 226        weak_project: WeakEntity<Project>,
 227        cx: &mut Context<Self>,
 228    ) -> Self {
 229        Self::new_internal(false, None, registry, worktree_store, weak_project, cx)
 230    }
 231
 232    #[cfg(any(test, feature = "test-support"))]
 233    pub fn test_maintain_server_loop(
 234        context_server_factory: Option<ContextServerFactory>,
 235        registry: Entity<ContextServerDescriptorRegistry>,
 236        worktree_store: Entity<WorktreeStore>,
 237        weak_project: WeakEntity<Project>,
 238        cx: &mut Context<Self>,
 239    ) -> Self {
 240        Self::new_internal(
 241            true,
 242            context_server_factory,
 243            registry,
 244            worktree_store,
 245            weak_project,
 246            cx,
 247        )
 248    }
 249
 250    fn new_internal(
 251        maintain_server_loop: bool,
 252        context_server_factory: Option<ContextServerFactory>,
 253        registry: Entity<ContextServerDescriptorRegistry>,
 254        worktree_store: Entity<WorktreeStore>,
 255        weak_project: WeakEntity<Project>,
 256        cx: &mut Context<Self>,
 257    ) -> Self {
 258        let subscriptions = if maintain_server_loop {
 259            vec![
 260                cx.observe(&registry, |this, _registry, cx| {
 261                    this.available_context_servers_changed(cx);
 262                }),
 263                cx.observe_global::<SettingsStore>(|this, cx| {
 264                    let settings =
 265                        &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
 266                    if &this.context_server_settings == settings {
 267                        return;
 268                    }
 269                    this.context_server_settings = settings.clone();
 270                    this.available_context_servers_changed(cx);
 271                }),
 272            ]
 273        } else {
 274            Vec::new()
 275        };
 276
 277        let mut this = Self {
 278            _subscriptions: subscriptions,
 279            context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
 280                .context_servers
 281                .clone(),
 282            worktree_store,
 283            project: weak_project,
 284            registry,
 285            needs_server_update: false,
 286            servers: HashMap::default(),
 287            update_servers_task: None,
 288            context_server_factory,
 289        };
 290        if maintain_server_loop {
 291            this.available_context_servers_changed(cx);
 292        }
 293        this
 294    }
 295
 296    pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 297        self.servers.get(id).map(|state| state.server())
 298    }
 299
 300    pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
 301        if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
 302            Some(server.clone())
 303        } else {
 304            None
 305        }
 306    }
 307
 308    pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
 309        self.servers.get(id).map(ContextServerStatus::from_state)
 310    }
 311
 312    pub fn configuration_for_server(
 313        &self,
 314        id: &ContextServerId,
 315    ) -> Option<Arc<ContextServerConfiguration>> {
 316        self.servers.get(id).map(|state| state.configuration())
 317    }
 318
 319    pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
 320        self.servers
 321            .keys()
 322            .cloned()
 323            .chain(
 324                self.registry
 325                    .read(cx)
 326                    .context_server_descriptors()
 327                    .into_iter()
 328                    .map(|(id, _)| ContextServerId(id)),
 329            )
 330            .collect()
 331    }
 332
 333    pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
 334        self.servers
 335            .values()
 336            .filter_map(|state| {
 337                if let ContextServerState::Running { server, .. } = state {
 338                    Some(server.clone())
 339                } else {
 340                    None
 341                }
 342            })
 343            .collect()
 344    }
 345
 346    pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
 347        cx.spawn(async move |this, cx| {
 348            let this = this.upgrade().context("Context server store dropped")?;
 349            let settings = this
 350                .update(cx, |this, _| {
 351                    this.context_server_settings.get(&server.id().0).cloned()
 352                })
 353                .ok()
 354                .flatten()
 355                .context("Failed to get context server settings")?;
 356
 357            if !settings.enabled() {
 358                return Ok(());
 359            }
 360
 361            let (registry, worktree_store) = this.update(cx, |this, _| {
 362                (this.registry.clone(), this.worktree_store.clone())
 363            })?;
 364            let configuration = ContextServerConfiguration::from_settings(
 365                settings,
 366                server.id(),
 367                registry,
 368                worktree_store,
 369                cx,
 370            )
 371            .await
 372            .context("Failed to create context server configuration")?;
 373
 374            this.update(cx, |this, cx| {
 375                this.run_server(server, Arc::new(configuration), cx)
 376            })
 377        })
 378        .detach_and_log_err(cx);
 379    }
 380
 381    pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 382        if matches!(
 383            self.servers.get(id),
 384            Some(ContextServerState::Stopped { .. })
 385        ) {
 386            return Ok(());
 387        }
 388
 389        let state = self
 390            .servers
 391            .remove(id)
 392            .context("Context server not found")?;
 393
 394        let server = state.server();
 395        let configuration = state.configuration();
 396        let mut result = Ok(());
 397        if let ContextServerState::Running { server, .. } = &state {
 398            result = server.stop();
 399        }
 400        drop(state);
 401
 402        self.update_server_state(
 403            id.clone(),
 404            ContextServerState::Stopped {
 405                configuration,
 406                server,
 407            },
 408            cx,
 409        );
 410
 411        result
 412    }
 413
 414    fn run_server(
 415        &mut self,
 416        server: Arc<ContextServer>,
 417        configuration: Arc<ContextServerConfiguration>,
 418        cx: &mut Context<Self>,
 419    ) {
 420        let id = server.id();
 421        if matches!(
 422            self.servers.get(&id),
 423            Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
 424        ) {
 425            self.stop_server(&id, cx).log_err();
 426        }
 427        let task = cx.spawn({
 428            let id = server.id();
 429            let server = server.clone();
 430            let configuration = configuration.clone();
 431
 432            async move |this, cx| {
 433                match server.clone().start(cx).await {
 434                    Ok(_) => {
 435                        debug_assert!(server.client().is_some());
 436
 437                        this.update(cx, |this, cx| {
 438                            this.update_server_state(
 439                                id.clone(),
 440                                ContextServerState::Running {
 441                                    server,
 442                                    configuration,
 443                                },
 444                                cx,
 445                            )
 446                        })
 447                        .log_err()
 448                    }
 449                    Err(err) => {
 450                        log::error!("{} context server failed to start: {}", id, err);
 451                        this.update(cx, |this, cx| {
 452                            this.update_server_state(
 453                                id.clone(),
 454                                ContextServerState::Error {
 455                                    configuration,
 456                                    server,
 457                                    error: err.to_string().into(),
 458                                },
 459                                cx,
 460                            )
 461                        })
 462                        .log_err()
 463                    }
 464                };
 465            }
 466        });
 467
 468        self.update_server_state(
 469            id.clone(),
 470            ContextServerState::Starting {
 471                configuration,
 472                _task: task,
 473                server,
 474            },
 475            cx,
 476        );
 477    }
 478
 479    fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
 480        let state = self
 481            .servers
 482            .remove(id)
 483            .context("Context server not found")?;
 484        drop(state);
 485        cx.emit(Event::ServerStatusChanged {
 486            server_id: id.clone(),
 487            status: ContextServerStatus::Stopped,
 488        });
 489        Ok(())
 490    }
 491
 492    fn create_context_server(
 493        &self,
 494        id: ContextServerId,
 495        configuration: Arc<ContextServerConfiguration>,
 496        cx: &mut Context<Self>,
 497    ) -> Result<Arc<ContextServer>> {
 498        let global_timeout =
 499            Self::resolve_project_settings(&self.worktree_store, cx).context_server_timeout;
 500
 501        if let Some(factory) = self.context_server_factory.as_ref() {
 502            return Ok(factory(id, configuration));
 503        }
 504
 505        match configuration.as_ref() {
 506            ContextServerConfiguration::Http {
 507                url,
 508                headers,
 509                timeout,
 510            } => Ok(Arc::new(ContextServer::http(
 511                id,
 512                url,
 513                headers.clone(),
 514                cx.http_client(),
 515                cx.background_executor().clone(),
 516                Some(Duration::from_secs(
 517                    timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
 518                )),
 519            )?)),
 520            _ => {
 521                let root_path = self
 522                    .project
 523                    .read_with(cx, |project, cx| project.active_project_directory(cx))
 524                    .ok()
 525                    .flatten()
 526                    .or_else(|| {
 527                        self.worktree_store.read_with(cx, |store, cx| {
 528                            store.visible_worktrees(cx).fold(None, |acc, item| {
 529                                if acc.is_none() {
 530                                    item.read(cx).root_dir()
 531                                } else {
 532                                    acc
 533                                }
 534                            })
 535                        })
 536                    });
 537
 538                let mut command = configuration
 539                    .command()
 540                    .context("Missing command configuration for stdio context server")?
 541                    .clone();
 542                command.timeout = Some(
 543                    command
 544                        .timeout
 545                        .unwrap_or(global_timeout)
 546                        .min(MAX_TIMEOUT_SECS),
 547                );
 548
 549                Ok(Arc::new(ContextServer::stdio(id, command, root_path)))
 550            }
 551        }
 552    }
 553
 554    fn resolve_project_settings<'a>(
 555        worktree_store: &'a Entity<WorktreeStore>,
 556        cx: &'a App,
 557    ) -> &'a ProjectSettings {
 558        let location = worktree_store
 559            .read(cx)
 560            .visible_worktrees(cx)
 561            .next()
 562            .map(|worktree| settings::SettingsLocation {
 563                worktree_id: worktree.read(cx).id(),
 564                path: RelPath::empty(),
 565            });
 566        ProjectSettings::get(location, cx)
 567    }
 568
 569    fn update_server_state(
 570        &mut self,
 571        id: ContextServerId,
 572        state: ContextServerState,
 573        cx: &mut Context<Self>,
 574    ) {
 575        let status = ContextServerStatus::from_state(&state);
 576        self.servers.insert(id.clone(), state);
 577        cx.emit(Event::ServerStatusChanged {
 578            server_id: id,
 579            status,
 580        });
 581    }
 582
 583    fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
 584        if self.update_servers_task.is_some() {
 585            self.needs_server_update = true;
 586        } else {
 587            self.needs_server_update = false;
 588            self.update_servers_task = Some(cx.spawn(async move |this, cx| {
 589                if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
 590                    log::error!("Error maintaining context servers: {}", err);
 591                }
 592
 593                this.update(cx, |this, cx| {
 594                    this.update_servers_task.take();
 595                    if this.needs_server_update {
 596                        this.available_context_servers_changed(cx);
 597                    }
 598                })?;
 599
 600                Ok(())
 601            }));
 602        }
 603    }
 604
 605    async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
 606        let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
 607            (
 608                this.context_server_settings.clone(),
 609                this.registry.clone(),
 610                this.worktree_store.clone(),
 611            )
 612        })?;
 613
 614        for (id, _) in
 615            registry.read_with(cx, |registry, _| registry.context_server_descriptors())?
 616        {
 617            configured_servers
 618                .entry(id)
 619                .or_insert(ContextServerSettings::default_extension());
 620        }
 621
 622        let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
 623            configured_servers
 624                .into_iter()
 625                .partition(|(_, settings)| settings.enabled());
 626
 627        let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
 628            let id = ContextServerId(id);
 629            ContextServerConfiguration::from_settings(
 630                settings,
 631                id.clone(),
 632                registry.clone(),
 633                worktree_store.clone(),
 634                cx,
 635            )
 636            .map(|config| (id, config))
 637        }))
 638        .await
 639        .into_iter()
 640        .filter_map(|(id, config)| config.map(|config| (id, config)))
 641        .collect::<HashMap<_, _>>();
 642
 643        let mut servers_to_start = Vec::new();
 644        let mut servers_to_remove = HashSet::default();
 645        let mut servers_to_stop = HashSet::default();
 646
 647        this.update(cx, |this, cx| {
 648            for server_id in this.servers.keys() {
 649                // All servers that are not in desired_servers should be removed from the store.
 650                // This can happen if the user removed a server from the context server settings.
 651                if !configured_servers.contains_key(server_id) {
 652                    if disabled_servers.contains_key(&server_id.0) {
 653                        servers_to_stop.insert(server_id.clone());
 654                    } else {
 655                        servers_to_remove.insert(server_id.clone());
 656                    }
 657                }
 658            }
 659
 660            for (id, config) in configured_servers {
 661                let state = this.servers.get(&id);
 662                let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
 663                let existing_config = state.as_ref().map(|state| state.configuration());
 664                if existing_config.as_deref() != Some(&config) || is_stopped {
 665                    let config = Arc::new(config);
 666                    let server = this.create_context_server(id.clone(), config.clone(), cx)?;
 667                    servers_to_start.push((server, config));
 668                    if this.servers.contains_key(&id) {
 669                        servers_to_stop.insert(id);
 670                    }
 671                }
 672            }
 673
 674            anyhow::Ok(())
 675        })??;
 676
 677        this.update(cx, |this, cx| {
 678            for id in servers_to_stop {
 679                this.stop_server(&id, cx)?;
 680            }
 681            for id in servers_to_remove {
 682                this.remove_server(&id, cx)?;
 683            }
 684            for (server, config) in servers_to_start {
 685                this.run_server(server, config, cx);
 686            }
 687            anyhow::Ok(())
 688        })?
 689    }
 690}
 691
 692#[cfg(test)]
 693mod tests {
 694    use super::*;
 695    use crate::{
 696        FakeFs, Project, context_server_store::registry::ContextServerDescriptor,
 697        project_settings::ProjectSettings,
 698    };
 699    use context_server::test::create_fake_transport;
 700    use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
 701    use http_client::{FakeHttpClient, Response};
 702    use serde_json::json;
 703    use std::{cell::RefCell, path::PathBuf, rc::Rc};
 704    use util::path;
 705
 706    #[gpui::test]
 707    async fn test_context_server_status(cx: &mut TestAppContext) {
 708        const SERVER_1_ID: &str = "mcp-1";
 709        const SERVER_2_ID: &str = "mcp-2";
 710
 711        let (_fs, project) = setup_context_server_test(
 712            cx,
 713            json!({"code.rs": ""}),
 714            vec![
 715                (SERVER_1_ID.into(), dummy_server_settings()),
 716                (SERVER_2_ID.into(), dummy_server_settings()),
 717            ],
 718        )
 719        .await;
 720
 721        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 722        let store = cx.new(|cx| {
 723            ContextServerStore::test(
 724                registry.clone(),
 725                project.read(cx).worktree_store(),
 726                project.downgrade(),
 727                cx,
 728            )
 729        });
 730
 731        let server_1_id = ContextServerId(SERVER_1_ID.into());
 732        let server_2_id = ContextServerId(SERVER_2_ID.into());
 733
 734        let server_1 = Arc::new(ContextServer::new(
 735            server_1_id.clone(),
 736            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 737        ));
 738        let server_2 = Arc::new(ContextServer::new(
 739            server_2_id.clone(),
 740            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
 741        ));
 742
 743        store.update(cx, |store, cx| store.start_server(server_1, cx));
 744
 745        cx.run_until_parked();
 746
 747        cx.update(|cx| {
 748            assert_eq!(
 749                store.read(cx).status_for_server(&server_1_id),
 750                Some(ContextServerStatus::Running)
 751            );
 752            assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
 753        });
 754
 755        store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
 756
 757        cx.run_until_parked();
 758
 759        cx.update(|cx| {
 760            assert_eq!(
 761                store.read(cx).status_for_server(&server_1_id),
 762                Some(ContextServerStatus::Running)
 763            );
 764            assert_eq!(
 765                store.read(cx).status_for_server(&server_2_id),
 766                Some(ContextServerStatus::Running)
 767            );
 768        });
 769
 770        store
 771            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
 772            .unwrap();
 773
 774        cx.update(|cx| {
 775            assert_eq!(
 776                store.read(cx).status_for_server(&server_1_id),
 777                Some(ContextServerStatus::Running)
 778            );
 779            assert_eq!(
 780                store.read(cx).status_for_server(&server_2_id),
 781                Some(ContextServerStatus::Stopped)
 782            );
 783        });
 784    }
 785
 786    #[gpui::test]
 787    async fn test_context_server_status_events(cx: &mut TestAppContext) {
 788        const SERVER_1_ID: &str = "mcp-1";
 789        const SERVER_2_ID: &str = "mcp-2";
 790
 791        let (_fs, project) = setup_context_server_test(
 792            cx,
 793            json!({"code.rs": ""}),
 794            vec![
 795                (SERVER_1_ID.into(), dummy_server_settings()),
 796                (SERVER_2_ID.into(), dummy_server_settings()),
 797            ],
 798        )
 799        .await;
 800
 801        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 802        let store = cx.new(|cx| {
 803            ContextServerStore::test(
 804                registry.clone(),
 805                project.read(cx).worktree_store(),
 806                project.downgrade(),
 807                cx,
 808            )
 809        });
 810
 811        let server_1_id = ContextServerId(SERVER_1_ID.into());
 812        let server_2_id = ContextServerId(SERVER_2_ID.into());
 813
 814        let server_1 = Arc::new(ContextServer::new(
 815            server_1_id.clone(),
 816            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 817        ));
 818        let server_2 = Arc::new(ContextServer::new(
 819            server_2_id.clone(),
 820            Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
 821        ));
 822
 823        let _server_events = assert_server_events(
 824            &store,
 825            vec![
 826                (server_1_id.clone(), ContextServerStatus::Starting),
 827                (server_1_id, ContextServerStatus::Running),
 828                (server_2_id.clone(), ContextServerStatus::Starting),
 829                (server_2_id.clone(), ContextServerStatus::Running),
 830                (server_2_id.clone(), ContextServerStatus::Stopped),
 831            ],
 832            cx,
 833        );
 834
 835        store.update(cx, |store, cx| store.start_server(server_1, cx));
 836
 837        cx.run_until_parked();
 838
 839        store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
 840
 841        cx.run_until_parked();
 842
 843        store
 844            .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
 845            .unwrap();
 846    }
 847
 848    #[gpui::test(iterations = 25)]
 849    async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
 850        const SERVER_1_ID: &str = "mcp-1";
 851
 852        let (_fs, project) = setup_context_server_test(
 853            cx,
 854            json!({"code.rs": ""}),
 855            vec![(SERVER_1_ID.into(), dummy_server_settings())],
 856        )
 857        .await;
 858
 859        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
 860        let store = cx.new(|cx| {
 861            ContextServerStore::test(
 862                registry.clone(),
 863                project.read(cx).worktree_store(),
 864                project.downgrade(),
 865                cx,
 866            )
 867        });
 868
 869        let server_id = ContextServerId(SERVER_1_ID.into());
 870
 871        let server_with_same_id_1 = Arc::new(ContextServer::new(
 872            server_id.clone(),
 873            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 874        ));
 875        let server_with_same_id_2 = Arc::new(ContextServer::new(
 876            server_id.clone(),
 877            Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
 878        ));
 879
 880        // If we start another server with the same id, we should report that we stopped the previous one
 881        let _server_events = assert_server_events(
 882            &store,
 883            vec![
 884                (server_id.clone(), ContextServerStatus::Starting),
 885                (server_id.clone(), ContextServerStatus::Stopped),
 886                (server_id.clone(), ContextServerStatus::Starting),
 887                (server_id.clone(), ContextServerStatus::Running),
 888            ],
 889            cx,
 890        );
 891
 892        store.update(cx, |store, cx| {
 893            store.start_server(server_with_same_id_1.clone(), cx)
 894        });
 895        store.update(cx, |store, cx| {
 896            store.start_server(server_with_same_id_2.clone(), cx)
 897        });
 898
 899        cx.run_until_parked();
 900
 901        cx.update(|cx| {
 902            assert_eq!(
 903                store.read(cx).status_for_server(&server_id),
 904                Some(ContextServerStatus::Running)
 905            );
 906        });
 907    }
 908
 909    #[gpui::test]
 910    async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
 911        const SERVER_1_ID: &str = "mcp-1";
 912        const SERVER_2_ID: &str = "mcp-2";
 913
 914        let server_1_id = ContextServerId(SERVER_1_ID.into());
 915        let server_2_id = ContextServerId(SERVER_2_ID.into());
 916
 917        let fake_descriptor_1 = Arc::new(FakeContextServerDescriptor::new(SERVER_1_ID));
 918
 919        let (_fs, project) = setup_context_server_test(
 920            cx,
 921            json!({"code.rs": ""}),
 922            vec![(
 923                SERVER_1_ID.into(),
 924                ContextServerSettings::Extension {
 925                    enabled: true,
 926                    settings: json!({
 927                        "somevalue": true
 928                    }),
 929                },
 930            )],
 931        )
 932        .await;
 933
 934        let executor = cx.executor();
 935        let registry = cx.new(|cx| {
 936            let mut registry = ContextServerDescriptorRegistry::new();
 937            registry.register_context_server_descriptor(SERVER_1_ID.into(), fake_descriptor_1, cx);
 938            registry
 939        });
 940        let store = cx.new(|cx| {
 941            ContextServerStore::test_maintain_server_loop(
 942                Some(Box::new(move |id, _| {
 943                    Arc::new(ContextServer::new(
 944                        id.clone(),
 945                        Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
 946                    ))
 947                })),
 948                registry.clone(),
 949                project.read(cx).worktree_store(),
 950                project.downgrade(),
 951                cx,
 952            )
 953        });
 954
 955        // Ensure that mcp-1 starts up
 956        {
 957            let _server_events = assert_server_events(
 958                &store,
 959                vec![
 960                    (server_1_id.clone(), ContextServerStatus::Starting),
 961                    (server_1_id.clone(), ContextServerStatus::Running),
 962                ],
 963                cx,
 964            );
 965            cx.run_until_parked();
 966        }
 967
 968        // Ensure that mcp-1 is restarted when the configuration was changed
 969        {
 970            let _server_events = assert_server_events(
 971                &store,
 972                vec![
 973                    (server_1_id.clone(), ContextServerStatus::Stopped),
 974                    (server_1_id.clone(), ContextServerStatus::Starting),
 975                    (server_1_id.clone(), ContextServerStatus::Running),
 976                ],
 977                cx,
 978            );
 979            set_context_server_configuration(
 980                vec![(
 981                    server_1_id.0.clone(),
 982                    settings::ContextServerSettingsContent::Extension {
 983                        enabled: true,
 984                        settings: json!({
 985                            "somevalue": false
 986                        }),
 987                    },
 988                )],
 989                cx,
 990            );
 991
 992            cx.run_until_parked();
 993        }
 994
 995        // Ensure that mcp-1 is not restarted when the configuration was not changed
 996        {
 997            let _server_events = assert_server_events(&store, vec![], cx);
 998            set_context_server_configuration(
 999                vec![(
1000                    server_1_id.0.clone(),
1001                    settings::ContextServerSettingsContent::Extension {
1002                        enabled: true,
1003                        settings: json!({
1004                            "somevalue": false
1005                        }),
1006                    },
1007                )],
1008                cx,
1009            );
1010
1011            cx.run_until_parked();
1012        }
1013
1014        // Ensure that mcp-2 is started once it is added to the settings
1015        {
1016            let _server_events = assert_server_events(
1017                &store,
1018                vec![
1019                    (server_2_id.clone(), ContextServerStatus::Starting),
1020                    (server_2_id.clone(), ContextServerStatus::Running),
1021                ],
1022                cx,
1023            );
1024            set_context_server_configuration(
1025                vec![
1026                    (
1027                        server_1_id.0.clone(),
1028                        settings::ContextServerSettingsContent::Extension {
1029                            enabled: true,
1030                            settings: json!({
1031                                "somevalue": false
1032                            }),
1033                        },
1034                    ),
1035                    (
1036                        server_2_id.0.clone(),
1037                        settings::ContextServerSettingsContent::Stdio {
1038                            enabled: true,
1039                            command: ContextServerCommand {
1040                                path: "somebinary".into(),
1041                                args: vec!["arg".to_string()],
1042                                env: None,
1043                                timeout: None,
1044                            },
1045                        },
1046                    ),
1047                ],
1048                cx,
1049            );
1050
1051            cx.run_until_parked();
1052        }
1053
1054        // Ensure that mcp-2 is restarted once the args have changed
1055        {
1056            let _server_events = assert_server_events(
1057                &store,
1058                vec![
1059                    (server_2_id.clone(), ContextServerStatus::Stopped),
1060                    (server_2_id.clone(), ContextServerStatus::Starting),
1061                    (server_2_id.clone(), ContextServerStatus::Running),
1062                ],
1063                cx,
1064            );
1065            set_context_server_configuration(
1066                vec![
1067                    (
1068                        server_1_id.0.clone(),
1069                        settings::ContextServerSettingsContent::Extension {
1070                            enabled: true,
1071                            settings: json!({
1072                                "somevalue": false
1073                            }),
1074                        },
1075                    ),
1076                    (
1077                        server_2_id.0.clone(),
1078                        settings::ContextServerSettingsContent::Stdio {
1079                            enabled: true,
1080                            command: ContextServerCommand {
1081                                path: "somebinary".into(),
1082                                args: vec!["anotherArg".to_string()],
1083                                env: None,
1084                                timeout: None,
1085                            },
1086                        },
1087                    ),
1088                ],
1089                cx,
1090            );
1091
1092            cx.run_until_parked();
1093        }
1094
1095        // Ensure that mcp-2 is removed once it is removed from the settings
1096        {
1097            let _server_events = assert_server_events(
1098                &store,
1099                vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
1100                cx,
1101            );
1102            set_context_server_configuration(
1103                vec![(
1104                    server_1_id.0.clone(),
1105                    settings::ContextServerSettingsContent::Extension {
1106                        enabled: true,
1107                        settings: json!({
1108                            "somevalue": false
1109                        }),
1110                    },
1111                )],
1112                cx,
1113            );
1114
1115            cx.run_until_parked();
1116
1117            cx.update(|cx| {
1118                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1119            });
1120        }
1121
1122        // Ensure that nothing happens if the settings do not change
1123        {
1124            let _server_events = assert_server_events(&store, vec![], cx);
1125            set_context_server_configuration(
1126                vec![(
1127                    server_1_id.0.clone(),
1128                    settings::ContextServerSettingsContent::Extension {
1129                        enabled: true,
1130                        settings: json!({
1131                            "somevalue": false
1132                        }),
1133                    },
1134                )],
1135                cx,
1136            );
1137
1138            cx.run_until_parked();
1139
1140            cx.update(|cx| {
1141                assert_eq!(
1142                    store.read(cx).status_for_server(&server_1_id),
1143                    Some(ContextServerStatus::Running)
1144                );
1145                assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1146            });
1147        }
1148    }
1149
1150    #[gpui::test]
1151    async fn test_context_server_enabled_disabled(cx: &mut TestAppContext) {
1152        const SERVER_1_ID: &str = "mcp-1";
1153
1154        let server_1_id = ContextServerId(SERVER_1_ID.into());
1155
1156        let (_fs, project) = setup_context_server_test(
1157            cx,
1158            json!({"code.rs": ""}),
1159            vec![(
1160                SERVER_1_ID.into(),
1161                ContextServerSettings::Stdio {
1162                    enabled: true,
1163                    command: ContextServerCommand {
1164                        path: "somebinary".into(),
1165                        args: vec!["arg".to_string()],
1166                        env: None,
1167                        timeout: None,
1168                    },
1169                },
1170            )],
1171        )
1172        .await;
1173
1174        let executor = cx.executor();
1175        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1176        let store = cx.new(|cx| {
1177            ContextServerStore::test_maintain_server_loop(
1178                Some(Box::new(move |id, _| {
1179                    Arc::new(ContextServer::new(
1180                        id.clone(),
1181                        Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1182                    ))
1183                })),
1184                registry.clone(),
1185                project.read(cx).worktree_store(),
1186                project.downgrade(),
1187                cx,
1188            )
1189        });
1190
1191        // Ensure that mcp-1 starts up
1192        {
1193            let _server_events = assert_server_events(
1194                &store,
1195                vec![
1196                    (server_1_id.clone(), ContextServerStatus::Starting),
1197                    (server_1_id.clone(), ContextServerStatus::Running),
1198                ],
1199                cx,
1200            );
1201            cx.run_until_parked();
1202        }
1203
1204        // Ensure that mcp-1 is stopped once it is disabled.
1205        {
1206            let _server_events = assert_server_events(
1207                &store,
1208                vec![(server_1_id.clone(), ContextServerStatus::Stopped)],
1209                cx,
1210            );
1211            set_context_server_configuration(
1212                vec![(
1213                    server_1_id.0.clone(),
1214                    settings::ContextServerSettingsContent::Stdio {
1215                        enabled: false,
1216                        command: ContextServerCommand {
1217                            path: "somebinary".into(),
1218                            args: vec!["arg".to_string()],
1219                            env: None,
1220                            timeout: None,
1221                        },
1222                    },
1223                )],
1224                cx,
1225            );
1226
1227            cx.run_until_parked();
1228        }
1229
1230        // Ensure that mcp-1 is started once it is enabled again.
1231        {
1232            let _server_events = assert_server_events(
1233                &store,
1234                vec![
1235                    (server_1_id.clone(), ContextServerStatus::Starting),
1236                    (server_1_id.clone(), ContextServerStatus::Running),
1237                ],
1238                cx,
1239            );
1240            set_context_server_configuration(
1241                vec![(
1242                    server_1_id.0.clone(),
1243                    settings::ContextServerSettingsContent::Stdio {
1244                        enabled: true,
1245                        command: ContextServerCommand {
1246                            path: "somebinary".into(),
1247                            args: vec!["arg".to_string()],
1248                            timeout: None,
1249                            env: None,
1250                        },
1251                    },
1252                )],
1253                cx,
1254            );
1255
1256            cx.run_until_parked();
1257        }
1258    }
1259
1260    fn set_context_server_configuration(
1261        context_servers: Vec<(Arc<str>, settings::ContextServerSettingsContent)>,
1262        cx: &mut TestAppContext,
1263    ) {
1264        cx.update(|cx| {
1265            SettingsStore::update_global(cx, |store, cx| {
1266                store.update_user_settings(cx, |content| {
1267                    content.project.context_servers.clear();
1268                    for (id, config) in context_servers {
1269                        content.project.context_servers.insert(id, config);
1270                    }
1271                });
1272            })
1273        });
1274    }
1275
1276    #[gpui::test]
1277    async fn test_remote_context_server(cx: &mut TestAppContext) {
1278        const SERVER_ID: &str = "remote-server";
1279        let server_id = ContextServerId(SERVER_ID.into());
1280        let server_url = "http://example.com/api";
1281
1282        let (_fs, project) = setup_context_server_test(
1283            cx,
1284            json!({ "code.rs": "" }),
1285            vec![(
1286                SERVER_ID.into(),
1287                ContextServerSettings::Http {
1288                    enabled: true,
1289                    url: server_url.to_string(),
1290                    headers: Default::default(),
1291                    timeout: None,
1292                },
1293            )],
1294        )
1295        .await;
1296
1297        let client = FakeHttpClient::create(|_| async move {
1298            use http_client::AsyncBody;
1299
1300            let response = Response::builder()
1301                .status(200)
1302                .header("Content-Type", "application/json")
1303                .body(AsyncBody::from(
1304                    serde_json::to_string(&json!({
1305                        "jsonrpc": "2.0",
1306                        "id": 0,
1307                        "result": {
1308                            "protocolVersion": "2024-11-05",
1309                            "capabilities": {},
1310                            "serverInfo": {
1311                                "name": "test-server",
1312                                "version": "1.0.0"
1313                            }
1314                        }
1315                    }))
1316                    .unwrap(),
1317                ))
1318                .unwrap();
1319            Ok(response)
1320        });
1321        cx.update(|cx| cx.set_http_client(client));
1322        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1323        let store = cx.new(|cx| {
1324            ContextServerStore::test_maintain_server_loop(
1325                None,
1326                registry.clone(),
1327                project.read(cx).worktree_store(),
1328                project.downgrade(),
1329                cx,
1330            )
1331        });
1332
1333        let _server_events = assert_server_events(
1334            &store,
1335            vec![
1336                (server_id.clone(), ContextServerStatus::Starting),
1337                (server_id.clone(), ContextServerStatus::Running),
1338            ],
1339            cx,
1340        );
1341        cx.run_until_parked();
1342    }
1343
1344    struct ServerEvents {
1345        received_event_count: Rc<RefCell<usize>>,
1346        expected_event_count: usize,
1347        _subscription: Subscription,
1348    }
1349
1350    impl Drop for ServerEvents {
1351        fn drop(&mut self) {
1352            let actual_event_count = *self.received_event_count.borrow();
1353            assert_eq!(
1354                actual_event_count, self.expected_event_count,
1355                "
1356                Expected to receive {} context server store events, but received {} events",
1357                self.expected_event_count, actual_event_count
1358            );
1359        }
1360    }
1361
1362    #[gpui::test]
1363    async fn test_context_server_global_timeout(cx: &mut TestAppContext) {
1364        cx.update(|cx| {
1365            let settings_store = SettingsStore::test(cx);
1366            cx.set_global(settings_store);
1367            SettingsStore::update_global(cx, |store, cx| {
1368                store
1369                    .set_user_settings(r#"{"context_server_timeout": 90}"#, cx)
1370                    .expect("Failed to set test user settings");
1371            });
1372        });
1373
1374        let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1375
1376        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1377        let store = cx.new(|cx| {
1378            ContextServerStore::test(
1379                registry.clone(),
1380                project.read(cx).worktree_store(),
1381                project.downgrade(),
1382                cx,
1383            )
1384        });
1385
1386        let result = store.update(cx, |store, cx| {
1387            store.create_context_server(
1388                ContextServerId("test-server".into()),
1389                Arc::new(ContextServerConfiguration::Http {
1390                    url: url::Url::parse("http://localhost:8080")
1391                        .expect("Failed to parse test URL"),
1392                    headers: Default::default(),
1393                    timeout: None,
1394                }),
1395                cx,
1396            )
1397        });
1398
1399        assert!(
1400            result.is_ok(),
1401            "Server should be created successfully with global timeout"
1402        );
1403    }
1404
1405    #[gpui::test]
1406    async fn test_context_server_per_server_timeout_override(cx: &mut TestAppContext) {
1407        const SERVER_ID: &str = "test-server";
1408
1409        cx.update(|cx| {
1410            let settings_store = SettingsStore::test(cx);
1411            cx.set_global(settings_store);
1412            SettingsStore::update_global(cx, |store, cx| {
1413                store
1414                    .set_user_settings(r#"{"context_server_timeout": 60}"#, cx)
1415                    .expect("Failed to set test user settings");
1416            });
1417        });
1418
1419        let (_fs, project) = setup_context_server_test(
1420            cx,
1421            json!({"code.rs": ""}),
1422            vec![(
1423                SERVER_ID.into(),
1424                ContextServerSettings::Http {
1425                    enabled: true,
1426                    url: "http://localhost:8080".to_string(),
1427                    headers: Default::default(),
1428                    timeout: Some(120),
1429                },
1430            )],
1431        )
1432        .await;
1433
1434        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1435        let store = cx.new(|cx| {
1436            ContextServerStore::test(
1437                registry.clone(),
1438                project.read(cx).worktree_store(),
1439                project.downgrade(),
1440                cx,
1441            )
1442        });
1443
1444        let result = store.update(cx, |store, cx| {
1445            store.create_context_server(
1446                ContextServerId("test-server".into()),
1447                Arc::new(ContextServerConfiguration::Http {
1448                    url: url::Url::parse("http://localhost:8080")
1449                        .expect("Failed to parse test URL"),
1450                    headers: Default::default(),
1451                    timeout: Some(120),
1452                }),
1453                cx,
1454            )
1455        });
1456
1457        assert!(
1458            result.is_ok(),
1459            "Server should be created successfully with per-server timeout override"
1460        );
1461    }
1462
1463    #[gpui::test]
1464    async fn test_context_server_stdio_timeout(cx: &mut TestAppContext) {
1465        const SERVER_ID: &str = "stdio-server";
1466
1467        let (_fs, project) = setup_context_server_test(
1468            cx,
1469            json!({"code.rs": ""}),
1470            vec![(
1471                SERVER_ID.into(),
1472                ContextServerSettings::Stdio {
1473                    enabled: true,
1474                    command: ContextServerCommand {
1475                        path: "/usr/bin/node".into(),
1476                        args: vec!["server.js".into()],
1477                        env: None,
1478                        timeout: Some(180000),
1479                    },
1480                },
1481            )],
1482        )
1483        .await;
1484
1485        let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1486        let store = cx.new(|cx| {
1487            ContextServerStore::test(
1488                registry.clone(),
1489                project.read(cx).worktree_store(),
1490                project.downgrade(),
1491                cx,
1492            )
1493        });
1494
1495        let result = store.update(cx, |store, cx| {
1496            store.create_context_server(
1497                ContextServerId("stdio-server".into()),
1498                Arc::new(ContextServerConfiguration::Custom {
1499                    command: ContextServerCommand {
1500                        path: "/usr/bin/node".into(),
1501                        args: vec!["server.js".into()],
1502                        env: None,
1503                        timeout: Some(180000),
1504                    },
1505                }),
1506                cx,
1507            )
1508        });
1509
1510        assert!(
1511            result.is_ok(),
1512            "Stdio server should be created successfully with timeout"
1513        );
1514    }
1515
1516    fn dummy_server_settings() -> ContextServerSettings {
1517        ContextServerSettings::Stdio {
1518            enabled: true,
1519            command: ContextServerCommand {
1520                path: "somebinary".into(),
1521                args: vec!["arg".to_string()],
1522                env: None,
1523                timeout: None,
1524            },
1525        }
1526    }
1527
1528    fn assert_server_events(
1529        store: &Entity<ContextServerStore>,
1530        expected_events: Vec<(ContextServerId, ContextServerStatus)>,
1531        cx: &mut TestAppContext,
1532    ) -> ServerEvents {
1533        cx.update(|cx| {
1534            let mut ix = 0;
1535            let received_event_count = Rc::new(RefCell::new(0));
1536            let expected_event_count = expected_events.len();
1537            let subscription = cx.subscribe(store, {
1538                let received_event_count = received_event_count.clone();
1539                move |_, event, _| match event {
1540                    Event::ServerStatusChanged {
1541                        server_id: actual_server_id,
1542                        status: actual_status,
1543                    } => {
1544                        let (expected_server_id, expected_status) = &expected_events[ix];
1545
1546                        assert_eq!(
1547                            actual_server_id, expected_server_id,
1548                            "Expected different server id at index {}",
1549                            ix
1550                        );
1551                        assert_eq!(
1552                            actual_status, expected_status,
1553                            "Expected different status at index {}",
1554                            ix
1555                        );
1556                        ix += 1;
1557                        *received_event_count.borrow_mut() += 1;
1558                    }
1559                }
1560            });
1561            ServerEvents {
1562                expected_event_count,
1563                received_event_count,
1564                _subscription: subscription,
1565            }
1566        })
1567    }
1568
1569    async fn setup_context_server_test(
1570        cx: &mut TestAppContext,
1571        files: serde_json::Value,
1572        context_server_configurations: Vec<(Arc<str>, ContextServerSettings)>,
1573    ) -> (Arc<FakeFs>, Entity<Project>) {
1574        cx.update(|cx| {
1575            let settings_store = SettingsStore::test(cx);
1576            cx.set_global(settings_store);
1577            let mut settings = ProjectSettings::get_global(cx).clone();
1578            for (id, config) in context_server_configurations {
1579                settings.context_servers.insert(id, config);
1580            }
1581            ProjectSettings::override_global(settings, cx);
1582        });
1583
1584        let fs = FakeFs::new(cx.executor());
1585        fs.insert_tree(path!("/test"), files).await;
1586        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
1587
1588        (fs, project)
1589    }
1590
1591    struct FakeContextServerDescriptor {
1592        path: PathBuf,
1593    }
1594
1595    impl FakeContextServerDescriptor {
1596        fn new(path: impl Into<PathBuf>) -> Self {
1597            Self { path: path.into() }
1598        }
1599    }
1600
1601    impl ContextServerDescriptor for FakeContextServerDescriptor {
1602        fn command(
1603            &self,
1604            _worktree_store: Entity<WorktreeStore>,
1605            _cx: &AsyncApp,
1606        ) -> Task<Result<ContextServerCommand>> {
1607            Task::ready(Ok(ContextServerCommand {
1608                path: self.path.clone(),
1609                args: vec!["arg1".to_string(), "arg2".to_string()],
1610                env: None,
1611                timeout: None,
1612            }))
1613        }
1614
1615        fn configuration(
1616            &self,
1617            _worktree_store: Entity<WorktreeStore>,
1618            _cx: &AsyncApp,
1619        ) -> Task<Result<Option<::extension::ContextServerConfiguration>>> {
1620            Task::ready(Ok(None))
1621        }
1622    }
1623}