1pub mod extension;
2pub mod registry;
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result};
8use collections::{HashMap, HashSet};
9use context_server::{ContextServer, ContextServerCommand, ContextServerId};
10use futures::{FutureExt as _, future::join_all};
11use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
12use registry::ContextServerDescriptorRegistry;
13use settings::{Settings as _, SettingsStore};
14use util::{ResultExt as _, rel_path::RelPath};
15
16use crate::{
17 Project,
18 project_settings::{ContextServerSettings, ProjectSettings},
19 worktree_store::WorktreeStore,
20};
21
22/// Maximum timeout for context server requests
23/// Prevents extremely large timeout values from tying up resources indefinitely.
24const MAX_TIMEOUT_SECS: u64 = 600; // 10 minutes
25
26pub fn init(cx: &mut App) {
27 extension::init(cx);
28}
29
30actions!(
31 context_server,
32 [
33 /// Restarts the context server.
34 Restart
35 ]
36);
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub enum ContextServerStatus {
40 Starting,
41 Running,
42 Stopped,
43 Error(Arc<str>),
44}
45
46impl ContextServerStatus {
47 fn from_state(state: &ContextServerState) -> Self {
48 match state {
49 ContextServerState::Starting { .. } => ContextServerStatus::Starting,
50 ContextServerState::Running { .. } => ContextServerStatus::Running,
51 ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
52 ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
53 }
54 }
55}
56
57enum ContextServerState {
58 Starting {
59 server: Arc<ContextServer>,
60 configuration: Arc<ContextServerConfiguration>,
61 _task: Task<()>,
62 },
63 Running {
64 server: Arc<ContextServer>,
65 configuration: Arc<ContextServerConfiguration>,
66 },
67 Stopped {
68 server: Arc<ContextServer>,
69 configuration: Arc<ContextServerConfiguration>,
70 },
71 Error {
72 server: Arc<ContextServer>,
73 configuration: Arc<ContextServerConfiguration>,
74 error: Arc<str>,
75 },
76}
77
78impl ContextServerState {
79 pub fn server(&self) -> Arc<ContextServer> {
80 match self {
81 ContextServerState::Starting { server, .. } => server.clone(),
82 ContextServerState::Running { server, .. } => server.clone(),
83 ContextServerState::Stopped { server, .. } => server.clone(),
84 ContextServerState::Error { server, .. } => server.clone(),
85 }
86 }
87
88 pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
89 match self {
90 ContextServerState::Starting { configuration, .. } => configuration.clone(),
91 ContextServerState::Running { configuration, .. } => configuration.clone(),
92 ContextServerState::Stopped { configuration, .. } => configuration.clone(),
93 ContextServerState::Error { configuration, .. } => configuration.clone(),
94 }
95 }
96}
97
98#[derive(Debug, PartialEq, Eq)]
99pub enum ContextServerConfiguration {
100 Custom {
101 command: ContextServerCommand,
102 },
103 Extension {
104 command: ContextServerCommand,
105 settings: serde_json::Value,
106 },
107 Http {
108 url: url::Url,
109 headers: HashMap<String, String>,
110 timeout: Option<u64>,
111 },
112}
113
114impl ContextServerConfiguration {
115 pub fn command(&self) -> Option<&ContextServerCommand> {
116 match self {
117 ContextServerConfiguration::Custom { command } => Some(command),
118 ContextServerConfiguration::Extension { command, .. } => Some(command),
119 ContextServerConfiguration::Http { .. } => None,
120 }
121 }
122
123 pub async fn from_settings(
124 settings: ContextServerSettings,
125 id: ContextServerId,
126 registry: Entity<ContextServerDescriptorRegistry>,
127 worktree_store: Entity<WorktreeStore>,
128 cx: &AsyncApp,
129 ) -> Option<Self> {
130 match settings {
131 ContextServerSettings::Stdio {
132 enabled: _,
133 command,
134 } => Some(ContextServerConfiguration::Custom { command }),
135 ContextServerSettings::Extension {
136 enabled: _,
137 settings,
138 } => {
139 let descriptor = cx
140 .update(|cx| registry.read(cx).context_server_descriptor(&id.0))
141 .ok()
142 .flatten()?;
143
144 match descriptor.command(worktree_store, cx).await {
145 Ok(command) => {
146 Some(ContextServerConfiguration::Extension { command, settings })
147 }
148 Err(e) => {
149 log::error!(
150 "Failed to create context server configuration from settings: {e:#}"
151 );
152 None
153 }
154 }
155 }
156 ContextServerSettings::Http {
157 enabled: _,
158 url,
159 headers: auth,
160 timeout,
161 } => {
162 let url = url::Url::parse(&url).log_err()?;
163 Some(ContextServerConfiguration::Http {
164 url,
165 headers: auth,
166 timeout,
167 })
168 }
169 }
170 }
171}
172
173pub type ContextServerFactory =
174 Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
175
176pub struct ContextServerStore {
177 context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
178 servers: HashMap<ContextServerId, ContextServerState>,
179 worktree_store: Entity<WorktreeStore>,
180 project: WeakEntity<Project>,
181 registry: Entity<ContextServerDescriptorRegistry>,
182 update_servers_task: Option<Task<Result<()>>>,
183 context_server_factory: Option<ContextServerFactory>,
184 needs_server_update: bool,
185 _subscriptions: Vec<Subscription>,
186}
187
188pub enum Event {
189 ServerStatusChanged {
190 server_id: ContextServerId,
191 status: ContextServerStatus,
192 },
193}
194
195impl EventEmitter<Event> for ContextServerStore {}
196
197impl ContextServerStore {
198 pub fn new(
199 worktree_store: Entity<WorktreeStore>,
200 weak_project: WeakEntity<Project>,
201 cx: &mut Context<Self>,
202 ) -> Self {
203 Self::new_internal(
204 true,
205 None,
206 ContextServerDescriptorRegistry::default_global(cx),
207 worktree_store,
208 weak_project,
209 cx,
210 )
211 }
212
213 /// Returns all configured context server ids, excluding the ones that are disabled
214 pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
215 self.context_server_settings
216 .iter()
217 .filter(|(_, settings)| settings.enabled())
218 .map(|(id, _)| ContextServerId(id.clone()))
219 .collect()
220 }
221
222 #[cfg(any(test, feature = "test-support"))]
223 pub fn test(
224 registry: Entity<ContextServerDescriptorRegistry>,
225 worktree_store: Entity<WorktreeStore>,
226 weak_project: WeakEntity<Project>,
227 cx: &mut Context<Self>,
228 ) -> Self {
229 Self::new_internal(false, None, registry, worktree_store, weak_project, cx)
230 }
231
232 #[cfg(any(test, feature = "test-support"))]
233 pub fn test_maintain_server_loop(
234 context_server_factory: Option<ContextServerFactory>,
235 registry: Entity<ContextServerDescriptorRegistry>,
236 worktree_store: Entity<WorktreeStore>,
237 weak_project: WeakEntity<Project>,
238 cx: &mut Context<Self>,
239 ) -> Self {
240 Self::new_internal(
241 true,
242 context_server_factory,
243 registry,
244 worktree_store,
245 weak_project,
246 cx,
247 )
248 }
249
250 fn new_internal(
251 maintain_server_loop: bool,
252 context_server_factory: Option<ContextServerFactory>,
253 registry: Entity<ContextServerDescriptorRegistry>,
254 worktree_store: Entity<WorktreeStore>,
255 weak_project: WeakEntity<Project>,
256 cx: &mut Context<Self>,
257 ) -> Self {
258 let subscriptions = if maintain_server_loop {
259 vec![
260 cx.observe(®istry, |this, _registry, cx| {
261 this.available_context_servers_changed(cx);
262 }),
263 cx.observe_global::<SettingsStore>(|this, cx| {
264 let settings =
265 &Self::resolve_project_settings(&this.worktree_store, cx).context_servers;
266 if &this.context_server_settings == settings {
267 return;
268 }
269 this.context_server_settings = settings.clone();
270 this.available_context_servers_changed(cx);
271 }),
272 ]
273 } else {
274 Vec::new()
275 };
276
277 let mut this = Self {
278 _subscriptions: subscriptions,
279 context_server_settings: Self::resolve_project_settings(&worktree_store, cx)
280 .context_servers
281 .clone(),
282 worktree_store,
283 project: weak_project,
284 registry,
285 needs_server_update: false,
286 servers: HashMap::default(),
287 update_servers_task: None,
288 context_server_factory,
289 };
290 if maintain_server_loop {
291 this.available_context_servers_changed(cx);
292 }
293 this
294 }
295
296 pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
297 self.servers.get(id).map(|state| state.server())
298 }
299
300 pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
301 if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
302 Some(server.clone())
303 } else {
304 None
305 }
306 }
307
308 pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
309 self.servers.get(id).map(ContextServerStatus::from_state)
310 }
311
312 pub fn configuration_for_server(
313 &self,
314 id: &ContextServerId,
315 ) -> Option<Arc<ContextServerConfiguration>> {
316 self.servers.get(id).map(|state| state.configuration())
317 }
318
319 pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
320 self.servers
321 .keys()
322 .cloned()
323 .chain(
324 self.registry
325 .read(cx)
326 .context_server_descriptors()
327 .into_iter()
328 .map(|(id, _)| ContextServerId(id)),
329 )
330 .collect()
331 }
332
333 pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
334 self.servers
335 .values()
336 .filter_map(|state| {
337 if let ContextServerState::Running { server, .. } = state {
338 Some(server.clone())
339 } else {
340 None
341 }
342 })
343 .collect()
344 }
345
346 pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
347 cx.spawn(async move |this, cx| {
348 let this = this.upgrade().context("Context server store dropped")?;
349 let settings = this
350 .update(cx, |this, _| {
351 this.context_server_settings.get(&server.id().0).cloned()
352 })
353 .ok()
354 .flatten()
355 .context("Failed to get context server settings")?;
356
357 if !settings.enabled() {
358 return Ok(());
359 }
360
361 let (registry, worktree_store) = this.update(cx, |this, _| {
362 (this.registry.clone(), this.worktree_store.clone())
363 })?;
364 let configuration = ContextServerConfiguration::from_settings(
365 settings,
366 server.id(),
367 registry,
368 worktree_store,
369 cx,
370 )
371 .await
372 .context("Failed to create context server configuration")?;
373
374 this.update(cx, |this, cx| {
375 this.run_server(server, Arc::new(configuration), cx)
376 })
377 })
378 .detach_and_log_err(cx);
379 }
380
381 pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
382 if matches!(
383 self.servers.get(id),
384 Some(ContextServerState::Stopped { .. })
385 ) {
386 return Ok(());
387 }
388
389 let state = self
390 .servers
391 .remove(id)
392 .context("Context server not found")?;
393
394 let server = state.server();
395 let configuration = state.configuration();
396 let mut result = Ok(());
397 if let ContextServerState::Running { server, .. } = &state {
398 result = server.stop();
399 }
400 drop(state);
401
402 self.update_server_state(
403 id.clone(),
404 ContextServerState::Stopped {
405 configuration,
406 server,
407 },
408 cx,
409 );
410
411 result
412 }
413
414 fn run_server(
415 &mut self,
416 server: Arc<ContextServer>,
417 configuration: Arc<ContextServerConfiguration>,
418 cx: &mut Context<Self>,
419 ) {
420 let id = server.id();
421 if matches!(
422 self.servers.get(&id),
423 Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
424 ) {
425 self.stop_server(&id, cx).log_err();
426 }
427 let task = cx.spawn({
428 let id = server.id();
429 let server = server.clone();
430 let configuration = configuration.clone();
431
432 async move |this, cx| {
433 match server.clone().start(cx).await {
434 Ok(_) => {
435 debug_assert!(server.client().is_some());
436
437 this.update(cx, |this, cx| {
438 this.update_server_state(
439 id.clone(),
440 ContextServerState::Running {
441 server,
442 configuration,
443 },
444 cx,
445 )
446 })
447 .log_err()
448 }
449 Err(err) => {
450 log::error!("{} context server failed to start: {}", id, err);
451 this.update(cx, |this, cx| {
452 this.update_server_state(
453 id.clone(),
454 ContextServerState::Error {
455 configuration,
456 server,
457 error: err.to_string().into(),
458 },
459 cx,
460 )
461 })
462 .log_err()
463 }
464 };
465 }
466 });
467
468 self.update_server_state(
469 id.clone(),
470 ContextServerState::Starting {
471 configuration,
472 _task: task,
473 server,
474 },
475 cx,
476 );
477 }
478
479 fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
480 let state = self
481 .servers
482 .remove(id)
483 .context("Context server not found")?;
484 drop(state);
485 cx.emit(Event::ServerStatusChanged {
486 server_id: id.clone(),
487 status: ContextServerStatus::Stopped,
488 });
489 Ok(())
490 }
491
492 fn create_context_server(
493 &self,
494 id: ContextServerId,
495 configuration: Arc<ContextServerConfiguration>,
496 cx: &mut Context<Self>,
497 ) -> Result<Arc<ContextServer>> {
498 let global_timeout =
499 Self::resolve_project_settings(&self.worktree_store, cx).context_server_timeout;
500
501 if let Some(factory) = self.context_server_factory.as_ref() {
502 return Ok(factory(id, configuration));
503 }
504
505 match configuration.as_ref() {
506 ContextServerConfiguration::Http {
507 url,
508 headers,
509 timeout,
510 } => Ok(Arc::new(ContextServer::http(
511 id,
512 url,
513 headers.clone(),
514 cx.http_client(),
515 cx.background_executor().clone(),
516 Some(Duration::from_secs(
517 timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_SECS),
518 )),
519 )?)),
520 _ => {
521 let root_path = self
522 .project
523 .read_with(cx, |project, cx| project.active_project_directory(cx))
524 .ok()
525 .flatten()
526 .or_else(|| {
527 self.worktree_store.read_with(cx, |store, cx| {
528 store.visible_worktrees(cx).fold(None, |acc, item| {
529 if acc.is_none() {
530 item.read(cx).root_dir()
531 } else {
532 acc
533 }
534 })
535 })
536 });
537
538 let mut command = configuration
539 .command()
540 .context("Missing command configuration for stdio context server")?
541 .clone();
542 command.timeout = Some(
543 command
544 .timeout
545 .unwrap_or(global_timeout)
546 .min(MAX_TIMEOUT_SECS),
547 );
548
549 Ok(Arc::new(ContextServer::stdio(id, command, root_path)))
550 }
551 }
552 }
553
554 fn resolve_project_settings<'a>(
555 worktree_store: &'a Entity<WorktreeStore>,
556 cx: &'a App,
557 ) -> &'a ProjectSettings {
558 let location = worktree_store
559 .read(cx)
560 .visible_worktrees(cx)
561 .next()
562 .map(|worktree| settings::SettingsLocation {
563 worktree_id: worktree.read(cx).id(),
564 path: RelPath::empty(),
565 });
566 ProjectSettings::get(location, cx)
567 }
568
569 fn update_server_state(
570 &mut self,
571 id: ContextServerId,
572 state: ContextServerState,
573 cx: &mut Context<Self>,
574 ) {
575 let status = ContextServerStatus::from_state(&state);
576 self.servers.insert(id.clone(), state);
577 cx.emit(Event::ServerStatusChanged {
578 server_id: id,
579 status,
580 });
581 }
582
583 fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
584 if self.update_servers_task.is_some() {
585 self.needs_server_update = true;
586 } else {
587 self.needs_server_update = false;
588 self.update_servers_task = Some(cx.spawn(async move |this, cx| {
589 if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
590 log::error!("Error maintaining context servers: {}", err);
591 }
592
593 this.update(cx, |this, cx| {
594 this.update_servers_task.take();
595 if this.needs_server_update {
596 this.available_context_servers_changed(cx);
597 }
598 })?;
599
600 Ok(())
601 }));
602 }
603 }
604
605 async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
606 let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
607 (
608 this.context_server_settings.clone(),
609 this.registry.clone(),
610 this.worktree_store.clone(),
611 )
612 })?;
613
614 for (id, _) in
615 registry.read_with(cx, |registry, _| registry.context_server_descriptors())?
616 {
617 configured_servers
618 .entry(id)
619 .or_insert(ContextServerSettings::default_extension());
620 }
621
622 let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
623 configured_servers
624 .into_iter()
625 .partition(|(_, settings)| settings.enabled());
626
627 let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
628 let id = ContextServerId(id);
629 ContextServerConfiguration::from_settings(
630 settings,
631 id.clone(),
632 registry.clone(),
633 worktree_store.clone(),
634 cx,
635 )
636 .map(|config| (id, config))
637 }))
638 .await
639 .into_iter()
640 .filter_map(|(id, config)| config.map(|config| (id, config)))
641 .collect::<HashMap<_, _>>();
642
643 let mut servers_to_start = Vec::new();
644 let mut servers_to_remove = HashSet::default();
645 let mut servers_to_stop = HashSet::default();
646
647 this.update(cx, |this, cx| {
648 for server_id in this.servers.keys() {
649 // All servers that are not in desired_servers should be removed from the store.
650 // This can happen if the user removed a server from the context server settings.
651 if !configured_servers.contains_key(server_id) {
652 if disabled_servers.contains_key(&server_id.0) {
653 servers_to_stop.insert(server_id.clone());
654 } else {
655 servers_to_remove.insert(server_id.clone());
656 }
657 }
658 }
659
660 for (id, config) in configured_servers {
661 let state = this.servers.get(&id);
662 let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
663 let existing_config = state.as_ref().map(|state| state.configuration());
664 if existing_config.as_deref() != Some(&config) || is_stopped {
665 let config = Arc::new(config);
666 let server = this.create_context_server(id.clone(), config.clone(), cx)?;
667 servers_to_start.push((server, config));
668 if this.servers.contains_key(&id) {
669 servers_to_stop.insert(id);
670 }
671 }
672 }
673
674 anyhow::Ok(())
675 })??;
676
677 this.update(cx, |this, cx| {
678 for id in servers_to_stop {
679 this.stop_server(&id, cx)?;
680 }
681 for id in servers_to_remove {
682 this.remove_server(&id, cx)?;
683 }
684 for (server, config) in servers_to_start {
685 this.run_server(server, config, cx);
686 }
687 anyhow::Ok(())
688 })?
689 }
690}
691
692#[cfg(test)]
693mod tests {
694 use super::*;
695 use crate::{
696 FakeFs, Project, context_server_store::registry::ContextServerDescriptor,
697 project_settings::ProjectSettings,
698 };
699 use context_server::test::create_fake_transport;
700 use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
701 use http_client::{FakeHttpClient, Response};
702 use serde_json::json;
703 use std::{cell::RefCell, path::PathBuf, rc::Rc};
704 use util::path;
705
706 #[gpui::test]
707 async fn test_context_server_status(cx: &mut TestAppContext) {
708 const SERVER_1_ID: &str = "mcp-1";
709 const SERVER_2_ID: &str = "mcp-2";
710
711 let (_fs, project) = setup_context_server_test(
712 cx,
713 json!({"code.rs": ""}),
714 vec![
715 (SERVER_1_ID.into(), dummy_server_settings()),
716 (SERVER_2_ID.into(), dummy_server_settings()),
717 ],
718 )
719 .await;
720
721 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
722 let store = cx.new(|cx| {
723 ContextServerStore::test(
724 registry.clone(),
725 project.read(cx).worktree_store(),
726 project.downgrade(),
727 cx,
728 )
729 });
730
731 let server_1_id = ContextServerId(SERVER_1_ID.into());
732 let server_2_id = ContextServerId(SERVER_2_ID.into());
733
734 let server_1 = Arc::new(ContextServer::new(
735 server_1_id.clone(),
736 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
737 ));
738 let server_2 = Arc::new(ContextServer::new(
739 server_2_id.clone(),
740 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
741 ));
742
743 store.update(cx, |store, cx| store.start_server(server_1, cx));
744
745 cx.run_until_parked();
746
747 cx.update(|cx| {
748 assert_eq!(
749 store.read(cx).status_for_server(&server_1_id),
750 Some(ContextServerStatus::Running)
751 );
752 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
753 });
754
755 store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
756
757 cx.run_until_parked();
758
759 cx.update(|cx| {
760 assert_eq!(
761 store.read(cx).status_for_server(&server_1_id),
762 Some(ContextServerStatus::Running)
763 );
764 assert_eq!(
765 store.read(cx).status_for_server(&server_2_id),
766 Some(ContextServerStatus::Running)
767 );
768 });
769
770 store
771 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
772 .unwrap();
773
774 cx.update(|cx| {
775 assert_eq!(
776 store.read(cx).status_for_server(&server_1_id),
777 Some(ContextServerStatus::Running)
778 );
779 assert_eq!(
780 store.read(cx).status_for_server(&server_2_id),
781 Some(ContextServerStatus::Stopped)
782 );
783 });
784 }
785
786 #[gpui::test]
787 async fn test_context_server_status_events(cx: &mut TestAppContext) {
788 const SERVER_1_ID: &str = "mcp-1";
789 const SERVER_2_ID: &str = "mcp-2";
790
791 let (_fs, project) = setup_context_server_test(
792 cx,
793 json!({"code.rs": ""}),
794 vec![
795 (SERVER_1_ID.into(), dummy_server_settings()),
796 (SERVER_2_ID.into(), dummy_server_settings()),
797 ],
798 )
799 .await;
800
801 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
802 let store = cx.new(|cx| {
803 ContextServerStore::test(
804 registry.clone(),
805 project.read(cx).worktree_store(),
806 project.downgrade(),
807 cx,
808 )
809 });
810
811 let server_1_id = ContextServerId(SERVER_1_ID.into());
812 let server_2_id = ContextServerId(SERVER_2_ID.into());
813
814 let server_1 = Arc::new(ContextServer::new(
815 server_1_id.clone(),
816 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
817 ));
818 let server_2 = Arc::new(ContextServer::new(
819 server_2_id.clone(),
820 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
821 ));
822
823 let _server_events = assert_server_events(
824 &store,
825 vec![
826 (server_1_id.clone(), ContextServerStatus::Starting),
827 (server_1_id, ContextServerStatus::Running),
828 (server_2_id.clone(), ContextServerStatus::Starting),
829 (server_2_id.clone(), ContextServerStatus::Running),
830 (server_2_id.clone(), ContextServerStatus::Stopped),
831 ],
832 cx,
833 );
834
835 store.update(cx, |store, cx| store.start_server(server_1, cx));
836
837 cx.run_until_parked();
838
839 store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
840
841 cx.run_until_parked();
842
843 store
844 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
845 .unwrap();
846 }
847
848 #[gpui::test(iterations = 25)]
849 async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
850 const SERVER_1_ID: &str = "mcp-1";
851
852 let (_fs, project) = setup_context_server_test(
853 cx,
854 json!({"code.rs": ""}),
855 vec![(SERVER_1_ID.into(), dummy_server_settings())],
856 )
857 .await;
858
859 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
860 let store = cx.new(|cx| {
861 ContextServerStore::test(
862 registry.clone(),
863 project.read(cx).worktree_store(),
864 project.downgrade(),
865 cx,
866 )
867 });
868
869 let server_id = ContextServerId(SERVER_1_ID.into());
870
871 let server_with_same_id_1 = Arc::new(ContextServer::new(
872 server_id.clone(),
873 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
874 ));
875 let server_with_same_id_2 = Arc::new(ContextServer::new(
876 server_id.clone(),
877 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
878 ));
879
880 // If we start another server with the same id, we should report that we stopped the previous one
881 let _server_events = assert_server_events(
882 &store,
883 vec![
884 (server_id.clone(), ContextServerStatus::Starting),
885 (server_id.clone(), ContextServerStatus::Stopped),
886 (server_id.clone(), ContextServerStatus::Starting),
887 (server_id.clone(), ContextServerStatus::Running),
888 ],
889 cx,
890 );
891
892 store.update(cx, |store, cx| {
893 store.start_server(server_with_same_id_1.clone(), cx)
894 });
895 store.update(cx, |store, cx| {
896 store.start_server(server_with_same_id_2.clone(), cx)
897 });
898
899 cx.run_until_parked();
900
901 cx.update(|cx| {
902 assert_eq!(
903 store.read(cx).status_for_server(&server_id),
904 Some(ContextServerStatus::Running)
905 );
906 });
907 }
908
909 #[gpui::test]
910 async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
911 const SERVER_1_ID: &str = "mcp-1";
912 const SERVER_2_ID: &str = "mcp-2";
913
914 let server_1_id = ContextServerId(SERVER_1_ID.into());
915 let server_2_id = ContextServerId(SERVER_2_ID.into());
916
917 let fake_descriptor_1 = Arc::new(FakeContextServerDescriptor::new(SERVER_1_ID));
918
919 let (_fs, project) = setup_context_server_test(
920 cx,
921 json!({"code.rs": ""}),
922 vec![(
923 SERVER_1_ID.into(),
924 ContextServerSettings::Extension {
925 enabled: true,
926 settings: json!({
927 "somevalue": true
928 }),
929 },
930 )],
931 )
932 .await;
933
934 let executor = cx.executor();
935 let registry = cx.new(|cx| {
936 let mut registry = ContextServerDescriptorRegistry::new();
937 registry.register_context_server_descriptor(SERVER_1_ID.into(), fake_descriptor_1, cx);
938 registry
939 });
940 let store = cx.new(|cx| {
941 ContextServerStore::test_maintain_server_loop(
942 Some(Box::new(move |id, _| {
943 Arc::new(ContextServer::new(
944 id.clone(),
945 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
946 ))
947 })),
948 registry.clone(),
949 project.read(cx).worktree_store(),
950 project.downgrade(),
951 cx,
952 )
953 });
954
955 // Ensure that mcp-1 starts up
956 {
957 let _server_events = assert_server_events(
958 &store,
959 vec![
960 (server_1_id.clone(), ContextServerStatus::Starting),
961 (server_1_id.clone(), ContextServerStatus::Running),
962 ],
963 cx,
964 );
965 cx.run_until_parked();
966 }
967
968 // Ensure that mcp-1 is restarted when the configuration was changed
969 {
970 let _server_events = assert_server_events(
971 &store,
972 vec![
973 (server_1_id.clone(), ContextServerStatus::Stopped),
974 (server_1_id.clone(), ContextServerStatus::Starting),
975 (server_1_id.clone(), ContextServerStatus::Running),
976 ],
977 cx,
978 );
979 set_context_server_configuration(
980 vec![(
981 server_1_id.0.clone(),
982 settings::ContextServerSettingsContent::Extension {
983 enabled: true,
984 settings: json!({
985 "somevalue": false
986 }),
987 },
988 )],
989 cx,
990 );
991
992 cx.run_until_parked();
993 }
994
995 // Ensure that mcp-1 is not restarted when the configuration was not changed
996 {
997 let _server_events = assert_server_events(&store, vec![], cx);
998 set_context_server_configuration(
999 vec![(
1000 server_1_id.0.clone(),
1001 settings::ContextServerSettingsContent::Extension {
1002 enabled: true,
1003 settings: json!({
1004 "somevalue": false
1005 }),
1006 },
1007 )],
1008 cx,
1009 );
1010
1011 cx.run_until_parked();
1012 }
1013
1014 // Ensure that mcp-2 is started once it is added to the settings
1015 {
1016 let _server_events = assert_server_events(
1017 &store,
1018 vec![
1019 (server_2_id.clone(), ContextServerStatus::Starting),
1020 (server_2_id.clone(), ContextServerStatus::Running),
1021 ],
1022 cx,
1023 );
1024 set_context_server_configuration(
1025 vec![
1026 (
1027 server_1_id.0.clone(),
1028 settings::ContextServerSettingsContent::Extension {
1029 enabled: true,
1030 settings: json!({
1031 "somevalue": false
1032 }),
1033 },
1034 ),
1035 (
1036 server_2_id.0.clone(),
1037 settings::ContextServerSettingsContent::Stdio {
1038 enabled: true,
1039 command: ContextServerCommand {
1040 path: "somebinary".into(),
1041 args: vec!["arg".to_string()],
1042 env: None,
1043 timeout: None,
1044 },
1045 },
1046 ),
1047 ],
1048 cx,
1049 );
1050
1051 cx.run_until_parked();
1052 }
1053
1054 // Ensure that mcp-2 is restarted once the args have changed
1055 {
1056 let _server_events = assert_server_events(
1057 &store,
1058 vec![
1059 (server_2_id.clone(), ContextServerStatus::Stopped),
1060 (server_2_id.clone(), ContextServerStatus::Starting),
1061 (server_2_id.clone(), ContextServerStatus::Running),
1062 ],
1063 cx,
1064 );
1065 set_context_server_configuration(
1066 vec![
1067 (
1068 server_1_id.0.clone(),
1069 settings::ContextServerSettingsContent::Extension {
1070 enabled: true,
1071 settings: json!({
1072 "somevalue": false
1073 }),
1074 },
1075 ),
1076 (
1077 server_2_id.0.clone(),
1078 settings::ContextServerSettingsContent::Stdio {
1079 enabled: true,
1080 command: ContextServerCommand {
1081 path: "somebinary".into(),
1082 args: vec!["anotherArg".to_string()],
1083 env: None,
1084 timeout: None,
1085 },
1086 },
1087 ),
1088 ],
1089 cx,
1090 );
1091
1092 cx.run_until_parked();
1093 }
1094
1095 // Ensure that mcp-2 is removed once it is removed from the settings
1096 {
1097 let _server_events = assert_server_events(
1098 &store,
1099 vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
1100 cx,
1101 );
1102 set_context_server_configuration(
1103 vec![(
1104 server_1_id.0.clone(),
1105 settings::ContextServerSettingsContent::Extension {
1106 enabled: true,
1107 settings: json!({
1108 "somevalue": false
1109 }),
1110 },
1111 )],
1112 cx,
1113 );
1114
1115 cx.run_until_parked();
1116
1117 cx.update(|cx| {
1118 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1119 });
1120 }
1121
1122 // Ensure that nothing happens if the settings do not change
1123 {
1124 let _server_events = assert_server_events(&store, vec![], cx);
1125 set_context_server_configuration(
1126 vec![(
1127 server_1_id.0.clone(),
1128 settings::ContextServerSettingsContent::Extension {
1129 enabled: true,
1130 settings: json!({
1131 "somevalue": false
1132 }),
1133 },
1134 )],
1135 cx,
1136 );
1137
1138 cx.run_until_parked();
1139
1140 cx.update(|cx| {
1141 assert_eq!(
1142 store.read(cx).status_for_server(&server_1_id),
1143 Some(ContextServerStatus::Running)
1144 );
1145 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1146 });
1147 }
1148 }
1149
1150 #[gpui::test]
1151 async fn test_context_server_enabled_disabled(cx: &mut TestAppContext) {
1152 const SERVER_1_ID: &str = "mcp-1";
1153
1154 let server_1_id = ContextServerId(SERVER_1_ID.into());
1155
1156 let (_fs, project) = setup_context_server_test(
1157 cx,
1158 json!({"code.rs": ""}),
1159 vec![(
1160 SERVER_1_ID.into(),
1161 ContextServerSettings::Stdio {
1162 enabled: true,
1163 command: ContextServerCommand {
1164 path: "somebinary".into(),
1165 args: vec!["arg".to_string()],
1166 env: None,
1167 timeout: None,
1168 },
1169 },
1170 )],
1171 )
1172 .await;
1173
1174 let executor = cx.executor();
1175 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1176 let store = cx.new(|cx| {
1177 ContextServerStore::test_maintain_server_loop(
1178 Some(Box::new(move |id, _| {
1179 Arc::new(ContextServer::new(
1180 id.clone(),
1181 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1182 ))
1183 })),
1184 registry.clone(),
1185 project.read(cx).worktree_store(),
1186 project.downgrade(),
1187 cx,
1188 )
1189 });
1190
1191 // Ensure that mcp-1 starts up
1192 {
1193 let _server_events = assert_server_events(
1194 &store,
1195 vec![
1196 (server_1_id.clone(), ContextServerStatus::Starting),
1197 (server_1_id.clone(), ContextServerStatus::Running),
1198 ],
1199 cx,
1200 );
1201 cx.run_until_parked();
1202 }
1203
1204 // Ensure that mcp-1 is stopped once it is disabled.
1205 {
1206 let _server_events = assert_server_events(
1207 &store,
1208 vec![(server_1_id.clone(), ContextServerStatus::Stopped)],
1209 cx,
1210 );
1211 set_context_server_configuration(
1212 vec![(
1213 server_1_id.0.clone(),
1214 settings::ContextServerSettingsContent::Stdio {
1215 enabled: false,
1216 command: ContextServerCommand {
1217 path: "somebinary".into(),
1218 args: vec!["arg".to_string()],
1219 env: None,
1220 timeout: None,
1221 },
1222 },
1223 )],
1224 cx,
1225 );
1226
1227 cx.run_until_parked();
1228 }
1229
1230 // Ensure that mcp-1 is started once it is enabled again.
1231 {
1232 let _server_events = assert_server_events(
1233 &store,
1234 vec![
1235 (server_1_id.clone(), ContextServerStatus::Starting),
1236 (server_1_id.clone(), ContextServerStatus::Running),
1237 ],
1238 cx,
1239 );
1240 set_context_server_configuration(
1241 vec![(
1242 server_1_id.0.clone(),
1243 settings::ContextServerSettingsContent::Stdio {
1244 enabled: true,
1245 command: ContextServerCommand {
1246 path: "somebinary".into(),
1247 args: vec!["arg".to_string()],
1248 timeout: None,
1249 env: None,
1250 },
1251 },
1252 )],
1253 cx,
1254 );
1255
1256 cx.run_until_parked();
1257 }
1258 }
1259
1260 fn set_context_server_configuration(
1261 context_servers: Vec<(Arc<str>, settings::ContextServerSettingsContent)>,
1262 cx: &mut TestAppContext,
1263 ) {
1264 cx.update(|cx| {
1265 SettingsStore::update_global(cx, |store, cx| {
1266 store.update_user_settings(cx, |content| {
1267 content.project.context_servers.clear();
1268 for (id, config) in context_servers {
1269 content.project.context_servers.insert(id, config);
1270 }
1271 });
1272 })
1273 });
1274 }
1275
1276 #[gpui::test]
1277 async fn test_remote_context_server(cx: &mut TestAppContext) {
1278 const SERVER_ID: &str = "remote-server";
1279 let server_id = ContextServerId(SERVER_ID.into());
1280 let server_url = "http://example.com/api";
1281
1282 let (_fs, project) = setup_context_server_test(
1283 cx,
1284 json!({ "code.rs": "" }),
1285 vec![(
1286 SERVER_ID.into(),
1287 ContextServerSettings::Http {
1288 enabled: true,
1289 url: server_url.to_string(),
1290 headers: Default::default(),
1291 timeout: None,
1292 },
1293 )],
1294 )
1295 .await;
1296
1297 let client = FakeHttpClient::create(|_| async move {
1298 use http_client::AsyncBody;
1299
1300 let response = Response::builder()
1301 .status(200)
1302 .header("Content-Type", "application/json")
1303 .body(AsyncBody::from(
1304 serde_json::to_string(&json!({
1305 "jsonrpc": "2.0",
1306 "id": 0,
1307 "result": {
1308 "protocolVersion": "2024-11-05",
1309 "capabilities": {},
1310 "serverInfo": {
1311 "name": "test-server",
1312 "version": "1.0.0"
1313 }
1314 }
1315 }))
1316 .unwrap(),
1317 ))
1318 .unwrap();
1319 Ok(response)
1320 });
1321 cx.update(|cx| cx.set_http_client(client));
1322 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1323 let store = cx.new(|cx| {
1324 ContextServerStore::test_maintain_server_loop(
1325 None,
1326 registry.clone(),
1327 project.read(cx).worktree_store(),
1328 project.downgrade(),
1329 cx,
1330 )
1331 });
1332
1333 let _server_events = assert_server_events(
1334 &store,
1335 vec![
1336 (server_id.clone(), ContextServerStatus::Starting),
1337 (server_id.clone(), ContextServerStatus::Running),
1338 ],
1339 cx,
1340 );
1341 cx.run_until_parked();
1342 }
1343
1344 struct ServerEvents {
1345 received_event_count: Rc<RefCell<usize>>,
1346 expected_event_count: usize,
1347 _subscription: Subscription,
1348 }
1349
1350 impl Drop for ServerEvents {
1351 fn drop(&mut self) {
1352 let actual_event_count = *self.received_event_count.borrow();
1353 assert_eq!(
1354 actual_event_count, self.expected_event_count,
1355 "
1356 Expected to receive {} context server store events, but received {} events",
1357 self.expected_event_count, actual_event_count
1358 );
1359 }
1360 }
1361
1362 #[gpui::test]
1363 async fn test_context_server_global_timeout(cx: &mut TestAppContext) {
1364 cx.update(|cx| {
1365 let settings_store = SettingsStore::test(cx);
1366 cx.set_global(settings_store);
1367 SettingsStore::update_global(cx, |store, cx| {
1368 store
1369 .set_user_settings(r#"{"context_server_timeout": 90}"#, cx)
1370 .expect("Failed to set test user settings");
1371 });
1372 });
1373
1374 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1375
1376 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1377 let store = cx.new(|cx| {
1378 ContextServerStore::test(
1379 registry.clone(),
1380 project.read(cx).worktree_store(),
1381 project.downgrade(),
1382 cx,
1383 )
1384 });
1385
1386 let result = store.update(cx, |store, cx| {
1387 store.create_context_server(
1388 ContextServerId("test-server".into()),
1389 Arc::new(ContextServerConfiguration::Http {
1390 url: url::Url::parse("http://localhost:8080")
1391 .expect("Failed to parse test URL"),
1392 headers: Default::default(),
1393 timeout: None,
1394 }),
1395 cx,
1396 )
1397 });
1398
1399 assert!(
1400 result.is_ok(),
1401 "Server should be created successfully with global timeout"
1402 );
1403 }
1404
1405 #[gpui::test]
1406 async fn test_context_server_per_server_timeout_override(cx: &mut TestAppContext) {
1407 const SERVER_ID: &str = "test-server";
1408
1409 cx.update(|cx| {
1410 let settings_store = SettingsStore::test(cx);
1411 cx.set_global(settings_store);
1412 SettingsStore::update_global(cx, |store, cx| {
1413 store
1414 .set_user_settings(r#"{"context_server_timeout": 60}"#, cx)
1415 .expect("Failed to set test user settings");
1416 });
1417 });
1418
1419 let (_fs, project) = setup_context_server_test(
1420 cx,
1421 json!({"code.rs": ""}),
1422 vec![(
1423 SERVER_ID.into(),
1424 ContextServerSettings::Http {
1425 enabled: true,
1426 url: "http://localhost:8080".to_string(),
1427 headers: Default::default(),
1428 timeout: Some(120),
1429 },
1430 )],
1431 )
1432 .await;
1433
1434 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1435 let store = cx.new(|cx| {
1436 ContextServerStore::test(
1437 registry.clone(),
1438 project.read(cx).worktree_store(),
1439 project.downgrade(),
1440 cx,
1441 )
1442 });
1443
1444 let result = store.update(cx, |store, cx| {
1445 store.create_context_server(
1446 ContextServerId("test-server".into()),
1447 Arc::new(ContextServerConfiguration::Http {
1448 url: url::Url::parse("http://localhost:8080")
1449 .expect("Failed to parse test URL"),
1450 headers: Default::default(),
1451 timeout: Some(120),
1452 }),
1453 cx,
1454 )
1455 });
1456
1457 assert!(
1458 result.is_ok(),
1459 "Server should be created successfully with per-server timeout override"
1460 );
1461 }
1462
1463 #[gpui::test]
1464 async fn test_context_server_stdio_timeout(cx: &mut TestAppContext) {
1465 const SERVER_ID: &str = "stdio-server";
1466
1467 let (_fs, project) = setup_context_server_test(
1468 cx,
1469 json!({"code.rs": ""}),
1470 vec![(
1471 SERVER_ID.into(),
1472 ContextServerSettings::Stdio {
1473 enabled: true,
1474 command: ContextServerCommand {
1475 path: "/usr/bin/node".into(),
1476 args: vec!["server.js".into()],
1477 env: None,
1478 timeout: Some(180000),
1479 },
1480 },
1481 )],
1482 )
1483 .await;
1484
1485 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1486 let store = cx.new(|cx| {
1487 ContextServerStore::test(
1488 registry.clone(),
1489 project.read(cx).worktree_store(),
1490 project.downgrade(),
1491 cx,
1492 )
1493 });
1494
1495 let result = store.update(cx, |store, cx| {
1496 store.create_context_server(
1497 ContextServerId("stdio-server".into()),
1498 Arc::new(ContextServerConfiguration::Custom {
1499 command: ContextServerCommand {
1500 path: "/usr/bin/node".into(),
1501 args: vec!["server.js".into()],
1502 env: None,
1503 timeout: Some(180000),
1504 },
1505 }),
1506 cx,
1507 )
1508 });
1509
1510 assert!(
1511 result.is_ok(),
1512 "Stdio server should be created successfully with timeout"
1513 );
1514 }
1515
1516 fn dummy_server_settings() -> ContextServerSettings {
1517 ContextServerSettings::Stdio {
1518 enabled: true,
1519 command: ContextServerCommand {
1520 path: "somebinary".into(),
1521 args: vec!["arg".to_string()],
1522 env: None,
1523 timeout: None,
1524 },
1525 }
1526 }
1527
1528 fn assert_server_events(
1529 store: &Entity<ContextServerStore>,
1530 expected_events: Vec<(ContextServerId, ContextServerStatus)>,
1531 cx: &mut TestAppContext,
1532 ) -> ServerEvents {
1533 cx.update(|cx| {
1534 let mut ix = 0;
1535 let received_event_count = Rc::new(RefCell::new(0));
1536 let expected_event_count = expected_events.len();
1537 let subscription = cx.subscribe(store, {
1538 let received_event_count = received_event_count.clone();
1539 move |_, event, _| match event {
1540 Event::ServerStatusChanged {
1541 server_id: actual_server_id,
1542 status: actual_status,
1543 } => {
1544 let (expected_server_id, expected_status) = &expected_events[ix];
1545
1546 assert_eq!(
1547 actual_server_id, expected_server_id,
1548 "Expected different server id at index {}",
1549 ix
1550 );
1551 assert_eq!(
1552 actual_status, expected_status,
1553 "Expected different status at index {}",
1554 ix
1555 );
1556 ix += 1;
1557 *received_event_count.borrow_mut() += 1;
1558 }
1559 }
1560 });
1561 ServerEvents {
1562 expected_event_count,
1563 received_event_count,
1564 _subscription: subscription,
1565 }
1566 })
1567 }
1568
1569 async fn setup_context_server_test(
1570 cx: &mut TestAppContext,
1571 files: serde_json::Value,
1572 context_server_configurations: Vec<(Arc<str>, ContextServerSettings)>,
1573 ) -> (Arc<FakeFs>, Entity<Project>) {
1574 cx.update(|cx| {
1575 let settings_store = SettingsStore::test(cx);
1576 cx.set_global(settings_store);
1577 let mut settings = ProjectSettings::get_global(cx).clone();
1578 for (id, config) in context_server_configurations {
1579 settings.context_servers.insert(id, config);
1580 }
1581 ProjectSettings::override_global(settings, cx);
1582 });
1583
1584 let fs = FakeFs::new(cx.executor());
1585 fs.insert_tree(path!("/test"), files).await;
1586 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
1587
1588 (fs, project)
1589 }
1590
1591 struct FakeContextServerDescriptor {
1592 path: PathBuf,
1593 }
1594
1595 impl FakeContextServerDescriptor {
1596 fn new(path: impl Into<PathBuf>) -> Self {
1597 Self { path: path.into() }
1598 }
1599 }
1600
1601 impl ContextServerDescriptor for FakeContextServerDescriptor {
1602 fn command(
1603 &self,
1604 _worktree_store: Entity<WorktreeStore>,
1605 _cx: &AsyncApp,
1606 ) -> Task<Result<ContextServerCommand>> {
1607 Task::ready(Ok(ContextServerCommand {
1608 path: self.path.clone(),
1609 args: vec!["arg1".to_string(), "arg2".to_string()],
1610 env: None,
1611 timeout: None,
1612 }))
1613 }
1614
1615 fn configuration(
1616 &self,
1617 _worktree_store: Entity<WorktreeStore>,
1618 _cx: &AsyncApp,
1619 ) -> Task<Result<Option<::extension::ContextServerConfiguration>>> {
1620 Task::ready(Ok(None))
1621 }
1622 }
1623}