1pub mod extension;
2pub mod registry;
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context as _, Result};
8use collections::{HashMap, HashSet};
9use context_server::{ContextServer, ContextServerCommand, ContextServerId};
10use futures::{FutureExt as _, future::join_all};
11use gpui::{App, AsyncApp, Context, Entity, EventEmitter, Subscription, Task, WeakEntity, actions};
12use registry::ContextServerDescriptorRegistry;
13use settings::{Settings as _, SettingsStore};
14use util::{ResultExt as _, rel_path::RelPath};
15
16use crate::{
17 Project,
18 project_settings::{ContextServerSettings, ProjectSettings},
19 worktree_store::WorktreeStore,
20};
21
22/// Maximum timeout for context server requests (10 minutes).
23/// Prevents extremely large timeout values from tying up resources indefinitely.
24const MAX_TIMEOUT_MS: u64 = 600_000;
25
26pub fn init(cx: &mut App) {
27 extension::init(cx);
28}
29
30actions!(
31 context_server,
32 [
33 /// Restarts the context server.
34 Restart
35 ]
36);
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub enum ContextServerStatus {
40 Starting,
41 Running,
42 Stopped,
43 Error(Arc<str>),
44}
45
46impl ContextServerStatus {
47 fn from_state(state: &ContextServerState) -> Self {
48 match state {
49 ContextServerState::Starting { .. } => ContextServerStatus::Starting,
50 ContextServerState::Running { .. } => ContextServerStatus::Running,
51 ContextServerState::Stopped { .. } => ContextServerStatus::Stopped,
52 ContextServerState::Error { error, .. } => ContextServerStatus::Error(error.clone()),
53 }
54 }
55}
56
57enum ContextServerState {
58 Starting {
59 server: Arc<ContextServer>,
60 configuration: Arc<ContextServerConfiguration>,
61 _task: Task<()>,
62 },
63 Running {
64 server: Arc<ContextServer>,
65 configuration: Arc<ContextServerConfiguration>,
66 },
67 Stopped {
68 server: Arc<ContextServer>,
69 configuration: Arc<ContextServerConfiguration>,
70 },
71 Error {
72 server: Arc<ContextServer>,
73 configuration: Arc<ContextServerConfiguration>,
74 error: Arc<str>,
75 },
76}
77
78impl ContextServerState {
79 pub fn server(&self) -> Arc<ContextServer> {
80 match self {
81 ContextServerState::Starting { server, .. } => server.clone(),
82 ContextServerState::Running { server, .. } => server.clone(),
83 ContextServerState::Stopped { server, .. } => server.clone(),
84 ContextServerState::Error { server, .. } => server.clone(),
85 }
86 }
87
88 pub fn configuration(&self) -> Arc<ContextServerConfiguration> {
89 match self {
90 ContextServerState::Starting { configuration, .. } => configuration.clone(),
91 ContextServerState::Running { configuration, .. } => configuration.clone(),
92 ContextServerState::Stopped { configuration, .. } => configuration.clone(),
93 ContextServerState::Error { configuration, .. } => configuration.clone(),
94 }
95 }
96}
97
98#[derive(Debug, PartialEq, Eq)]
99pub enum ContextServerConfiguration {
100 Custom {
101 command: ContextServerCommand,
102 },
103 Extension {
104 command: ContextServerCommand,
105 settings: serde_json::Value,
106 },
107 Http {
108 url: url::Url,
109 headers: HashMap<String, String>,
110 timeout: Option<u64>,
111 },
112}
113
114impl ContextServerConfiguration {
115 pub fn command(&self) -> Option<&ContextServerCommand> {
116 match self {
117 ContextServerConfiguration::Custom { command } => Some(command),
118 ContextServerConfiguration::Extension { command, .. } => Some(command),
119 ContextServerConfiguration::Http { .. } => None,
120 }
121 }
122
123 pub async fn from_settings(
124 settings: ContextServerSettings,
125 id: ContextServerId,
126 registry: Entity<ContextServerDescriptorRegistry>,
127 worktree_store: Entity<WorktreeStore>,
128 cx: &AsyncApp,
129 ) -> Option<Self> {
130 match settings {
131 ContextServerSettings::Stdio {
132 enabled: _,
133 command,
134 } => Some(ContextServerConfiguration::Custom { command }),
135 ContextServerSettings::Extension {
136 enabled: _,
137 settings,
138 } => {
139 let descriptor = cx
140 .update(|cx| registry.read(cx).context_server_descriptor(&id.0))
141 .ok()
142 .flatten()?;
143
144 match descriptor.command(worktree_store, cx).await {
145 Ok(command) => {
146 Some(ContextServerConfiguration::Extension { command, settings })
147 }
148 Err(e) => {
149 log::error!(
150 "Failed to create context server configuration from settings: {e:#}"
151 );
152 None
153 }
154 }
155 }
156 ContextServerSettings::Http {
157 enabled: _,
158 url,
159 headers: auth,
160 timeout,
161 } => {
162 let url = url::Url::parse(&url).log_err()?;
163 Some(ContextServerConfiguration::Http {
164 url,
165 headers: auth,
166 timeout,
167 })
168 }
169 }
170 }
171}
172
173pub type ContextServerFactory =
174 Box<dyn Fn(ContextServerId, Arc<ContextServerConfiguration>) -> Arc<ContextServer>>;
175
176pub struct ContextServerStore {
177 context_server_settings: HashMap<Arc<str>, ContextServerSettings>,
178 servers: HashMap<ContextServerId, ContextServerState>,
179 worktree_store: Entity<WorktreeStore>,
180 project: WeakEntity<Project>,
181 registry: Entity<ContextServerDescriptorRegistry>,
182 update_servers_task: Option<Task<Result<()>>>,
183 context_server_factory: Option<ContextServerFactory>,
184 needs_server_update: bool,
185 _subscriptions: Vec<Subscription>,
186}
187
188pub enum Event {
189 ServerStatusChanged {
190 server_id: ContextServerId,
191 status: ContextServerStatus,
192 },
193}
194
195impl EventEmitter<Event> for ContextServerStore {}
196
197impl ContextServerStore {
198 pub fn new(
199 worktree_store: Entity<WorktreeStore>,
200 weak_project: WeakEntity<Project>,
201 cx: &mut Context<Self>,
202 ) -> Self {
203 Self::new_internal(
204 true,
205 None,
206 ContextServerDescriptorRegistry::default_global(cx),
207 worktree_store,
208 weak_project,
209 cx,
210 )
211 }
212
213 /// Returns all configured context server ids, excluding the ones that are disabled
214 pub fn configured_server_ids(&self) -> Vec<ContextServerId> {
215 self.context_server_settings
216 .iter()
217 .filter(|(_, settings)| settings.enabled())
218 .map(|(id, _)| ContextServerId(id.clone()))
219 .collect()
220 }
221
222 #[cfg(any(test, feature = "test-support"))]
223 pub fn test(
224 registry: Entity<ContextServerDescriptorRegistry>,
225 worktree_store: Entity<WorktreeStore>,
226 weak_project: WeakEntity<Project>,
227 cx: &mut Context<Self>,
228 ) -> Self {
229 Self::new_internal(false, None, registry, worktree_store, weak_project, cx)
230 }
231
232 #[cfg(any(test, feature = "test-support"))]
233 pub fn test_maintain_server_loop(
234 context_server_factory: Option<ContextServerFactory>,
235 registry: Entity<ContextServerDescriptorRegistry>,
236 worktree_store: Entity<WorktreeStore>,
237 weak_project: WeakEntity<Project>,
238 cx: &mut Context<Self>,
239 ) -> Self {
240 Self::new_internal(
241 true,
242 context_server_factory,
243 registry,
244 worktree_store,
245 weak_project,
246 cx,
247 )
248 }
249
250 fn new_internal(
251 maintain_server_loop: bool,
252 context_server_factory: Option<ContextServerFactory>,
253 registry: Entity<ContextServerDescriptorRegistry>,
254 worktree_store: Entity<WorktreeStore>,
255 weak_project: WeakEntity<Project>,
256 cx: &mut Context<Self>,
257 ) -> Self {
258 let subscriptions = if maintain_server_loop {
259 vec![
260 cx.observe(®istry, |this, _registry, cx| {
261 this.available_context_servers_changed(cx);
262 }),
263 cx.observe_global::<SettingsStore>(|this, cx| {
264 let settings = Self::resolve_context_server_settings(&this.worktree_store, cx);
265 if &this.context_server_settings == settings {
266 return;
267 }
268 this.context_server_settings = settings.clone();
269 this.available_context_servers_changed(cx);
270 }),
271 ]
272 } else {
273 Vec::new()
274 };
275
276 let mut this = Self {
277 _subscriptions: subscriptions,
278 context_server_settings: Self::resolve_context_server_settings(&worktree_store, cx)
279 .clone(),
280 worktree_store,
281 project: weak_project,
282 registry,
283 needs_server_update: false,
284 servers: HashMap::default(),
285 update_servers_task: None,
286 context_server_factory,
287 };
288 if maintain_server_loop {
289 this.available_context_servers_changed(cx);
290 }
291 this
292 }
293
294 pub fn get_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
295 self.servers.get(id).map(|state| state.server())
296 }
297
298 pub fn get_running_server(&self, id: &ContextServerId) -> Option<Arc<ContextServer>> {
299 if let Some(ContextServerState::Running { server, .. }) = self.servers.get(id) {
300 Some(server.clone())
301 } else {
302 None
303 }
304 }
305
306 pub fn status_for_server(&self, id: &ContextServerId) -> Option<ContextServerStatus> {
307 self.servers.get(id).map(ContextServerStatus::from_state)
308 }
309
310 pub fn configuration_for_server(
311 &self,
312 id: &ContextServerId,
313 ) -> Option<Arc<ContextServerConfiguration>> {
314 self.servers.get(id).map(|state| state.configuration())
315 }
316
317 pub fn server_ids(&self, cx: &App) -> HashSet<ContextServerId> {
318 self.servers
319 .keys()
320 .cloned()
321 .chain(
322 self.registry
323 .read(cx)
324 .context_server_descriptors()
325 .into_iter()
326 .map(|(id, _)| ContextServerId(id)),
327 )
328 .collect()
329 }
330
331 pub fn running_servers(&self) -> Vec<Arc<ContextServer>> {
332 self.servers
333 .values()
334 .filter_map(|state| {
335 if let ContextServerState::Running { server, .. } = state {
336 Some(server.clone())
337 } else {
338 None
339 }
340 })
341 .collect()
342 }
343
344 pub fn start_server(&mut self, server: Arc<ContextServer>, cx: &mut Context<Self>) {
345 cx.spawn(async move |this, cx| {
346 let this = this.upgrade().context("Context server store dropped")?;
347 let settings = this
348 .update(cx, |this, _| {
349 this.context_server_settings.get(&server.id().0).cloned()
350 })
351 .ok()
352 .flatten()
353 .context("Failed to get context server settings")?;
354
355 if !settings.enabled() {
356 return Ok(());
357 }
358
359 let (registry, worktree_store) = this.update(cx, |this, _| {
360 (this.registry.clone(), this.worktree_store.clone())
361 })?;
362 let configuration = ContextServerConfiguration::from_settings(
363 settings,
364 server.id(),
365 registry,
366 worktree_store,
367 cx,
368 )
369 .await
370 .context("Failed to create context server configuration")?;
371
372 this.update(cx, |this, cx| {
373 this.run_server(server, Arc::new(configuration), cx)
374 })
375 })
376 .detach_and_log_err(cx);
377 }
378
379 pub fn stop_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
380 if matches!(
381 self.servers.get(id),
382 Some(ContextServerState::Stopped { .. })
383 ) {
384 return Ok(());
385 }
386
387 let state = self
388 .servers
389 .remove(id)
390 .context("Context server not found")?;
391
392 let server = state.server();
393 let configuration = state.configuration();
394 let mut result = Ok(());
395 if let ContextServerState::Running { server, .. } = &state {
396 result = server.stop();
397 }
398 drop(state);
399
400 self.update_server_state(
401 id.clone(),
402 ContextServerState::Stopped {
403 configuration,
404 server,
405 },
406 cx,
407 );
408
409 result
410 }
411
412 fn run_server(
413 &mut self,
414 server: Arc<ContextServer>,
415 configuration: Arc<ContextServerConfiguration>,
416 cx: &mut Context<Self>,
417 ) {
418 let id = server.id();
419 if matches!(
420 self.servers.get(&id),
421 Some(ContextServerState::Starting { .. } | ContextServerState::Running { .. })
422 ) {
423 self.stop_server(&id, cx).log_err();
424 }
425 let task = cx.spawn({
426 let id = server.id();
427 let server = server.clone();
428 let configuration = configuration.clone();
429
430 async move |this, cx| {
431 match server.clone().start(cx).await {
432 Ok(_) => {
433 debug_assert!(server.client().is_some());
434
435 this.update(cx, |this, cx| {
436 this.update_server_state(
437 id.clone(),
438 ContextServerState::Running {
439 server,
440 configuration,
441 },
442 cx,
443 )
444 })
445 .log_err()
446 }
447 Err(err) => {
448 log::error!("{} context server failed to start: {}", id, err);
449 this.update(cx, |this, cx| {
450 this.update_server_state(
451 id.clone(),
452 ContextServerState::Error {
453 configuration,
454 server,
455 error: err.to_string().into(),
456 },
457 cx,
458 )
459 })
460 .log_err()
461 }
462 };
463 }
464 });
465
466 self.update_server_state(
467 id.clone(),
468 ContextServerState::Starting {
469 configuration,
470 _task: task,
471 server,
472 },
473 cx,
474 );
475 }
476
477 fn remove_server(&mut self, id: &ContextServerId, cx: &mut Context<Self>) -> Result<()> {
478 let state = self
479 .servers
480 .remove(id)
481 .context("Context server not found")?;
482 drop(state);
483 cx.emit(Event::ServerStatusChanged {
484 server_id: id.clone(),
485 status: ContextServerStatus::Stopped,
486 });
487 Ok(())
488 }
489
490 fn create_context_server(
491 &self,
492 id: ContextServerId,
493 configuration: Arc<ContextServerConfiguration>,
494 cx: &mut Context<Self>,
495 ) -> Result<Arc<ContextServer>> {
496 // Get global timeout from settings
497 let global_timeout = ProjectSettings::get_global(cx).context_server_timeout;
498
499 if let Some(factory) = self.context_server_factory.as_ref() {
500 return Ok(factory(id, configuration));
501 }
502
503 match configuration.as_ref() {
504 ContextServerConfiguration::Http {
505 url,
506 headers,
507 timeout,
508 } => {
509 // Apply timeout precedence for HTTP servers: per-server > global
510 // Cap at MAX_TIMEOUT_MS to prevent extremely large timeout values
511 let resolved_timeout = timeout.unwrap_or(global_timeout).min(MAX_TIMEOUT_MS);
512
513 Ok(Arc::new(ContextServer::http(
514 id,
515 url,
516 headers.clone(),
517 cx.http_client(),
518 cx.background_executor().clone(),
519 Some(Duration::from_millis(resolved_timeout)),
520 )?))
521 }
522 _ => {
523 let root_path = self
524 .project
525 .read_with(cx, |project, cx| project.active_project_directory(cx))
526 .ok()
527 .flatten()
528 .or_else(|| {
529 self.worktree_store.read_with(cx, |store, cx| {
530 store.visible_worktrees(cx).fold(None, |acc, item| {
531 if acc.is_none() {
532 item.read(cx).root_dir()
533 } else {
534 acc
535 }
536 })
537 })
538 });
539
540 // Apply timeout precedence for stdio servers: per-server > global
541 // Cap at MAX_TIMEOUT_MS to prevent extremely large timeout values
542 let mut command_with_timeout = configuration
543 .command()
544 .context("Missing command configuration for stdio context server")?
545 .clone();
546 if command_with_timeout.timeout.is_none() {
547 command_with_timeout.timeout = Some(global_timeout.min(MAX_TIMEOUT_MS));
548 } else {
549 command_with_timeout.timeout =
550 command_with_timeout.timeout.map(|t| t.min(MAX_TIMEOUT_MS));
551 }
552
553 Ok(Arc::new(ContextServer::stdio(
554 id,
555 command_with_timeout,
556 root_path,
557 )))
558 }
559 }
560 }
561
562 fn resolve_context_server_settings<'a>(
563 worktree_store: &'a Entity<WorktreeStore>,
564 cx: &'a App,
565 ) -> &'a HashMap<Arc<str>, ContextServerSettings> {
566 let location = worktree_store
567 .read(cx)
568 .visible_worktrees(cx)
569 .next()
570 .map(|worktree| settings::SettingsLocation {
571 worktree_id: worktree.read(cx).id(),
572 path: RelPath::empty(),
573 });
574 &ProjectSettings::get(location, cx).context_servers
575 }
576
577 fn update_server_state(
578 &mut self,
579 id: ContextServerId,
580 state: ContextServerState,
581 cx: &mut Context<Self>,
582 ) {
583 let status = ContextServerStatus::from_state(&state);
584 self.servers.insert(id.clone(), state);
585 cx.emit(Event::ServerStatusChanged {
586 server_id: id,
587 status,
588 });
589 }
590
591 fn available_context_servers_changed(&mut self, cx: &mut Context<Self>) {
592 if self.update_servers_task.is_some() {
593 self.needs_server_update = true;
594 } else {
595 self.needs_server_update = false;
596 self.update_servers_task = Some(cx.spawn(async move |this, cx| {
597 if let Err(err) = Self::maintain_servers(this.clone(), cx).await {
598 log::error!("Error maintaining context servers: {}", err);
599 }
600
601 this.update(cx, |this, cx| {
602 this.update_servers_task.take();
603 if this.needs_server_update {
604 this.available_context_servers_changed(cx);
605 }
606 })?;
607
608 Ok(())
609 }));
610 }
611 }
612
613 async fn maintain_servers(this: WeakEntity<Self>, cx: &mut AsyncApp) -> Result<()> {
614 let (mut configured_servers, registry, worktree_store) = this.update(cx, |this, _| {
615 (
616 this.context_server_settings.clone(),
617 this.registry.clone(),
618 this.worktree_store.clone(),
619 )
620 })?;
621
622 for (id, _) in
623 registry.read_with(cx, |registry, _| registry.context_server_descriptors())?
624 {
625 configured_servers
626 .entry(id)
627 .or_insert(ContextServerSettings::default_extension());
628 }
629
630 let (enabled_servers, disabled_servers): (HashMap<_, _>, HashMap<_, _>) =
631 configured_servers
632 .into_iter()
633 .partition(|(_, settings)| settings.enabled());
634
635 let configured_servers = join_all(enabled_servers.into_iter().map(|(id, settings)| {
636 let id = ContextServerId(id);
637 ContextServerConfiguration::from_settings(
638 settings,
639 id.clone(),
640 registry.clone(),
641 worktree_store.clone(),
642 cx,
643 )
644 .map(|config| (id, config))
645 }))
646 .await
647 .into_iter()
648 .filter_map(|(id, config)| config.map(|config| (id, config)))
649 .collect::<HashMap<_, _>>();
650
651 let mut servers_to_start = Vec::new();
652 let mut servers_to_remove = HashSet::default();
653 let mut servers_to_stop = HashSet::default();
654
655 this.update(cx, |this, cx| {
656 for server_id in this.servers.keys() {
657 // All servers that are not in desired_servers should be removed from the store.
658 // This can happen if the user removed a server from the context server settings.
659 if !configured_servers.contains_key(server_id) {
660 if disabled_servers.contains_key(&server_id.0) {
661 servers_to_stop.insert(server_id.clone());
662 } else {
663 servers_to_remove.insert(server_id.clone());
664 }
665 }
666 }
667
668 for (id, config) in configured_servers {
669 let state = this.servers.get(&id);
670 let is_stopped = matches!(state, Some(ContextServerState::Stopped { .. }));
671 let existing_config = state.as_ref().map(|state| state.configuration());
672 if existing_config.as_deref() != Some(&config) || is_stopped {
673 let config = Arc::new(config);
674 let server = this.create_context_server(id.clone(), config.clone(), cx)?;
675 servers_to_start.push((server, config));
676 if this.servers.contains_key(&id) {
677 servers_to_stop.insert(id);
678 }
679 }
680 }
681
682 anyhow::Ok(())
683 })??;
684
685 this.update(cx, |this, cx| {
686 for id in servers_to_stop {
687 this.stop_server(&id, cx)?;
688 }
689 for id in servers_to_remove {
690 this.remove_server(&id, cx)?;
691 }
692 for (server, config) in servers_to_start {
693 this.run_server(server, config, cx);
694 }
695 anyhow::Ok(())
696 })?
697 }
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703 use crate::{
704 FakeFs, Project, context_server_store::registry::ContextServerDescriptor,
705 project_settings::ProjectSettings,
706 };
707 use context_server::test::create_fake_transport;
708 use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
709 use http_client::{FakeHttpClient, Response};
710 use serde_json::json;
711 use std::{cell::RefCell, path::PathBuf, rc::Rc};
712 use util::path;
713
714 #[gpui::test]
715 async fn test_context_server_status(cx: &mut TestAppContext) {
716 const SERVER_1_ID: &str = "mcp-1";
717 const SERVER_2_ID: &str = "mcp-2";
718
719 let (_fs, project) = setup_context_server_test(
720 cx,
721 json!({"code.rs": ""}),
722 vec![
723 (SERVER_1_ID.into(), dummy_server_settings()),
724 (SERVER_2_ID.into(), dummy_server_settings()),
725 ],
726 )
727 .await;
728
729 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
730 let store = cx.new(|cx| {
731 ContextServerStore::test(
732 registry.clone(),
733 project.read(cx).worktree_store(),
734 project.downgrade(),
735 cx,
736 )
737 });
738
739 let server_1_id = ContextServerId(SERVER_1_ID.into());
740 let server_2_id = ContextServerId(SERVER_2_ID.into());
741
742 let server_1 = Arc::new(ContextServer::new(
743 server_1_id.clone(),
744 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
745 ));
746 let server_2 = Arc::new(ContextServer::new(
747 server_2_id.clone(),
748 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
749 ));
750
751 store.update(cx, |store, cx| store.start_server(server_1, cx));
752
753 cx.run_until_parked();
754
755 cx.update(|cx| {
756 assert_eq!(
757 store.read(cx).status_for_server(&server_1_id),
758 Some(ContextServerStatus::Running)
759 );
760 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
761 });
762
763 store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
764
765 cx.run_until_parked();
766
767 cx.update(|cx| {
768 assert_eq!(
769 store.read(cx).status_for_server(&server_1_id),
770 Some(ContextServerStatus::Running)
771 );
772 assert_eq!(
773 store.read(cx).status_for_server(&server_2_id),
774 Some(ContextServerStatus::Running)
775 );
776 });
777
778 store
779 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
780 .unwrap();
781
782 cx.update(|cx| {
783 assert_eq!(
784 store.read(cx).status_for_server(&server_1_id),
785 Some(ContextServerStatus::Running)
786 );
787 assert_eq!(
788 store.read(cx).status_for_server(&server_2_id),
789 Some(ContextServerStatus::Stopped)
790 );
791 });
792 }
793
794 #[gpui::test]
795 async fn test_context_server_status_events(cx: &mut TestAppContext) {
796 const SERVER_1_ID: &str = "mcp-1";
797 const SERVER_2_ID: &str = "mcp-2";
798
799 let (_fs, project) = setup_context_server_test(
800 cx,
801 json!({"code.rs": ""}),
802 vec![
803 (SERVER_1_ID.into(), dummy_server_settings()),
804 (SERVER_2_ID.into(), dummy_server_settings()),
805 ],
806 )
807 .await;
808
809 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
810 let store = cx.new(|cx| {
811 ContextServerStore::test(
812 registry.clone(),
813 project.read(cx).worktree_store(),
814 project.downgrade(),
815 cx,
816 )
817 });
818
819 let server_1_id = ContextServerId(SERVER_1_ID.into());
820 let server_2_id = ContextServerId(SERVER_2_ID.into());
821
822 let server_1 = Arc::new(ContextServer::new(
823 server_1_id.clone(),
824 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
825 ));
826 let server_2 = Arc::new(ContextServer::new(
827 server_2_id.clone(),
828 Arc::new(create_fake_transport(SERVER_2_ID, cx.executor())),
829 ));
830
831 let _server_events = assert_server_events(
832 &store,
833 vec![
834 (server_1_id.clone(), ContextServerStatus::Starting),
835 (server_1_id, ContextServerStatus::Running),
836 (server_2_id.clone(), ContextServerStatus::Starting),
837 (server_2_id.clone(), ContextServerStatus::Running),
838 (server_2_id.clone(), ContextServerStatus::Stopped),
839 ],
840 cx,
841 );
842
843 store.update(cx, |store, cx| store.start_server(server_1, cx));
844
845 cx.run_until_parked();
846
847 store.update(cx, |store, cx| store.start_server(server_2.clone(), cx));
848
849 cx.run_until_parked();
850
851 store
852 .update(cx, |store, cx| store.stop_server(&server_2_id, cx))
853 .unwrap();
854 }
855
856 #[gpui::test(iterations = 25)]
857 async fn test_context_server_concurrent_starts(cx: &mut TestAppContext) {
858 const SERVER_1_ID: &str = "mcp-1";
859
860 let (_fs, project) = setup_context_server_test(
861 cx,
862 json!({"code.rs": ""}),
863 vec![(SERVER_1_ID.into(), dummy_server_settings())],
864 )
865 .await;
866
867 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
868 let store = cx.new(|cx| {
869 ContextServerStore::test(
870 registry.clone(),
871 project.read(cx).worktree_store(),
872 project.downgrade(),
873 cx,
874 )
875 });
876
877 let server_id = ContextServerId(SERVER_1_ID.into());
878
879 let server_with_same_id_1 = Arc::new(ContextServer::new(
880 server_id.clone(),
881 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
882 ));
883 let server_with_same_id_2 = Arc::new(ContextServer::new(
884 server_id.clone(),
885 Arc::new(create_fake_transport(SERVER_1_ID, cx.executor())),
886 ));
887
888 // If we start another server with the same id, we should report that we stopped the previous one
889 let _server_events = assert_server_events(
890 &store,
891 vec![
892 (server_id.clone(), ContextServerStatus::Starting),
893 (server_id.clone(), ContextServerStatus::Stopped),
894 (server_id.clone(), ContextServerStatus::Starting),
895 (server_id.clone(), ContextServerStatus::Running),
896 ],
897 cx,
898 );
899
900 store.update(cx, |store, cx| {
901 store.start_server(server_with_same_id_1.clone(), cx)
902 });
903 store.update(cx, |store, cx| {
904 store.start_server(server_with_same_id_2.clone(), cx)
905 });
906
907 cx.run_until_parked();
908
909 cx.update(|cx| {
910 assert_eq!(
911 store.read(cx).status_for_server(&server_id),
912 Some(ContextServerStatus::Running)
913 );
914 });
915 }
916
917 #[gpui::test]
918 async fn test_context_server_maintain_servers_loop(cx: &mut TestAppContext) {
919 const SERVER_1_ID: &str = "mcp-1";
920 const SERVER_2_ID: &str = "mcp-2";
921
922 let server_1_id = ContextServerId(SERVER_1_ID.into());
923 let server_2_id = ContextServerId(SERVER_2_ID.into());
924
925 let fake_descriptor_1 = Arc::new(FakeContextServerDescriptor::new(SERVER_1_ID));
926
927 let (_fs, project) = setup_context_server_test(
928 cx,
929 json!({"code.rs": ""}),
930 vec![(
931 SERVER_1_ID.into(),
932 ContextServerSettings::Extension {
933 enabled: true,
934 settings: json!({
935 "somevalue": true
936 }),
937 },
938 )],
939 )
940 .await;
941
942 let executor = cx.executor();
943 let registry = cx.new(|cx| {
944 let mut registry = ContextServerDescriptorRegistry::new();
945 registry.register_context_server_descriptor(SERVER_1_ID.into(), fake_descriptor_1, cx);
946 registry
947 });
948 let store = cx.new(|cx| {
949 ContextServerStore::test_maintain_server_loop(
950 Some(Box::new(move |id, _| {
951 Arc::new(ContextServer::new(
952 id.clone(),
953 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
954 ))
955 })),
956 registry.clone(),
957 project.read(cx).worktree_store(),
958 project.downgrade(),
959 cx,
960 )
961 });
962
963 // Ensure that mcp-1 starts up
964 {
965 let _server_events = assert_server_events(
966 &store,
967 vec![
968 (server_1_id.clone(), ContextServerStatus::Starting),
969 (server_1_id.clone(), ContextServerStatus::Running),
970 ],
971 cx,
972 );
973 cx.run_until_parked();
974 }
975
976 // Ensure that mcp-1 is restarted when the configuration was changed
977 {
978 let _server_events = assert_server_events(
979 &store,
980 vec![
981 (server_1_id.clone(), ContextServerStatus::Stopped),
982 (server_1_id.clone(), ContextServerStatus::Starting),
983 (server_1_id.clone(), ContextServerStatus::Running),
984 ],
985 cx,
986 );
987 set_context_server_configuration(
988 vec![(
989 server_1_id.0.clone(),
990 settings::ContextServerSettingsContent::Extension {
991 enabled: true,
992 settings: json!({
993 "somevalue": false
994 }),
995 },
996 )],
997 cx,
998 );
999
1000 cx.run_until_parked();
1001 }
1002
1003 // Ensure that mcp-1 is not restarted when the configuration was not changed
1004 {
1005 let _server_events = assert_server_events(&store, vec![], cx);
1006 set_context_server_configuration(
1007 vec![(
1008 server_1_id.0.clone(),
1009 settings::ContextServerSettingsContent::Extension {
1010 enabled: true,
1011 settings: json!({
1012 "somevalue": false
1013 }),
1014 },
1015 )],
1016 cx,
1017 );
1018
1019 cx.run_until_parked();
1020 }
1021
1022 // Ensure that mcp-2 is started once it is added to the settings
1023 {
1024 let _server_events = assert_server_events(
1025 &store,
1026 vec![
1027 (server_2_id.clone(), ContextServerStatus::Starting),
1028 (server_2_id.clone(), ContextServerStatus::Running),
1029 ],
1030 cx,
1031 );
1032 set_context_server_configuration(
1033 vec![
1034 (
1035 server_1_id.0.clone(),
1036 settings::ContextServerSettingsContent::Extension {
1037 enabled: true,
1038 settings: json!({
1039 "somevalue": false
1040 }),
1041 },
1042 ),
1043 (
1044 server_2_id.0.clone(),
1045 settings::ContextServerSettingsContent::Stdio {
1046 enabled: true,
1047 command: ContextServerCommand {
1048 path: "somebinary".into(),
1049 args: vec!["arg".to_string()],
1050 env: None,
1051 timeout: None,
1052 },
1053 },
1054 ),
1055 ],
1056 cx,
1057 );
1058
1059 cx.run_until_parked();
1060 }
1061
1062 // Ensure that mcp-2 is restarted once the args have changed
1063 {
1064 let _server_events = assert_server_events(
1065 &store,
1066 vec![
1067 (server_2_id.clone(), ContextServerStatus::Stopped),
1068 (server_2_id.clone(), ContextServerStatus::Starting),
1069 (server_2_id.clone(), ContextServerStatus::Running),
1070 ],
1071 cx,
1072 );
1073 set_context_server_configuration(
1074 vec![
1075 (
1076 server_1_id.0.clone(),
1077 settings::ContextServerSettingsContent::Extension {
1078 enabled: true,
1079 settings: json!({
1080 "somevalue": false
1081 }),
1082 },
1083 ),
1084 (
1085 server_2_id.0.clone(),
1086 settings::ContextServerSettingsContent::Stdio {
1087 enabled: true,
1088 command: ContextServerCommand {
1089 path: "somebinary".into(),
1090 args: vec!["anotherArg".to_string()],
1091 env: None,
1092 timeout: None,
1093 },
1094 },
1095 ),
1096 ],
1097 cx,
1098 );
1099
1100 cx.run_until_parked();
1101 }
1102
1103 // Ensure that mcp-2 is removed once it is removed from the settings
1104 {
1105 let _server_events = assert_server_events(
1106 &store,
1107 vec![(server_2_id.clone(), ContextServerStatus::Stopped)],
1108 cx,
1109 );
1110 set_context_server_configuration(
1111 vec![(
1112 server_1_id.0.clone(),
1113 settings::ContextServerSettingsContent::Extension {
1114 enabled: true,
1115 settings: json!({
1116 "somevalue": false
1117 }),
1118 },
1119 )],
1120 cx,
1121 );
1122
1123 cx.run_until_parked();
1124
1125 cx.update(|cx| {
1126 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1127 });
1128 }
1129
1130 // Ensure that nothing happens if the settings do not change
1131 {
1132 let _server_events = assert_server_events(&store, vec![], cx);
1133 set_context_server_configuration(
1134 vec![(
1135 server_1_id.0.clone(),
1136 settings::ContextServerSettingsContent::Extension {
1137 enabled: true,
1138 settings: json!({
1139 "somevalue": false
1140 }),
1141 },
1142 )],
1143 cx,
1144 );
1145
1146 cx.run_until_parked();
1147
1148 cx.update(|cx| {
1149 assert_eq!(
1150 store.read(cx).status_for_server(&server_1_id),
1151 Some(ContextServerStatus::Running)
1152 );
1153 assert_eq!(store.read(cx).status_for_server(&server_2_id), None);
1154 });
1155 }
1156 }
1157
1158 #[gpui::test]
1159 async fn test_context_server_enabled_disabled(cx: &mut TestAppContext) {
1160 const SERVER_1_ID: &str = "mcp-1";
1161
1162 let server_1_id = ContextServerId(SERVER_1_ID.into());
1163
1164 let (_fs, project) = setup_context_server_test(
1165 cx,
1166 json!({"code.rs": ""}),
1167 vec![(
1168 SERVER_1_ID.into(),
1169 ContextServerSettings::Stdio {
1170 enabled: true,
1171 command: ContextServerCommand {
1172 path: "somebinary".into(),
1173 args: vec!["arg".to_string()],
1174 env: None,
1175 timeout: None,
1176 },
1177 },
1178 )],
1179 )
1180 .await;
1181
1182 let executor = cx.executor();
1183 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1184 let store = cx.new(|cx| {
1185 ContextServerStore::test_maintain_server_loop(
1186 Some(Box::new(move |id, _| {
1187 Arc::new(ContextServer::new(
1188 id.clone(),
1189 Arc::new(create_fake_transport(id.0.to_string(), executor.clone())),
1190 ))
1191 })),
1192 registry.clone(),
1193 project.read(cx).worktree_store(),
1194 project.downgrade(),
1195 cx,
1196 )
1197 });
1198
1199 // Ensure that mcp-1 starts up
1200 {
1201 let _server_events = assert_server_events(
1202 &store,
1203 vec![
1204 (server_1_id.clone(), ContextServerStatus::Starting),
1205 (server_1_id.clone(), ContextServerStatus::Running),
1206 ],
1207 cx,
1208 );
1209 cx.run_until_parked();
1210 }
1211
1212 // Ensure that mcp-1 is stopped once it is disabled.
1213 {
1214 let _server_events = assert_server_events(
1215 &store,
1216 vec![(server_1_id.clone(), ContextServerStatus::Stopped)],
1217 cx,
1218 );
1219 set_context_server_configuration(
1220 vec![(
1221 server_1_id.0.clone(),
1222 settings::ContextServerSettingsContent::Stdio {
1223 enabled: false,
1224 command: ContextServerCommand {
1225 path: "somebinary".into(),
1226 args: vec!["arg".to_string()],
1227 env: None,
1228 timeout: None,
1229 },
1230 },
1231 )],
1232 cx,
1233 );
1234
1235 cx.run_until_parked();
1236 }
1237
1238 // Ensure that mcp-1 is started once it is enabled again.
1239 {
1240 let _server_events = assert_server_events(
1241 &store,
1242 vec![
1243 (server_1_id.clone(), ContextServerStatus::Starting),
1244 (server_1_id.clone(), ContextServerStatus::Running),
1245 ],
1246 cx,
1247 );
1248 set_context_server_configuration(
1249 vec![(
1250 server_1_id.0.clone(),
1251 settings::ContextServerSettingsContent::Stdio {
1252 enabled: true,
1253 command: ContextServerCommand {
1254 path: "somebinary".into(),
1255 args: vec!["arg".to_string()],
1256 timeout: None,
1257 env: None,
1258 },
1259 },
1260 )],
1261 cx,
1262 );
1263
1264 cx.run_until_parked();
1265 }
1266 }
1267
1268 fn set_context_server_configuration(
1269 context_servers: Vec<(Arc<str>, settings::ContextServerSettingsContent)>,
1270 cx: &mut TestAppContext,
1271 ) {
1272 cx.update(|cx| {
1273 SettingsStore::update_global(cx, |store, cx| {
1274 store.update_user_settings(cx, |content| {
1275 content.project.context_servers.clear();
1276 for (id, config) in context_servers {
1277 content.project.context_servers.insert(id, config);
1278 }
1279 });
1280 })
1281 });
1282 }
1283
1284 #[gpui::test]
1285 async fn test_remote_context_server(cx: &mut TestAppContext) {
1286 const SERVER_ID: &str = "remote-server";
1287 let server_id = ContextServerId(SERVER_ID.into());
1288 let server_url = "http://example.com/api";
1289
1290 let (_fs, project) = setup_context_server_test(
1291 cx,
1292 json!({ "code.rs": "" }),
1293 vec![(
1294 SERVER_ID.into(),
1295 ContextServerSettings::Http {
1296 enabled: true,
1297 url: server_url.to_string(),
1298 headers: Default::default(),
1299 timeout: None,
1300 },
1301 )],
1302 )
1303 .await;
1304
1305 let client = FakeHttpClient::create(|_| async move {
1306 use http_client::AsyncBody;
1307
1308 let response = Response::builder()
1309 .status(200)
1310 .header("Content-Type", "application/json")
1311 .body(AsyncBody::from(
1312 serde_json::to_string(&json!({
1313 "jsonrpc": "2.0",
1314 "id": 0,
1315 "result": {
1316 "protocolVersion": "2024-11-05",
1317 "capabilities": {},
1318 "serverInfo": {
1319 "name": "test-server",
1320 "version": "1.0.0"
1321 }
1322 }
1323 }))
1324 .unwrap(),
1325 ))
1326 .unwrap();
1327 Ok(response)
1328 });
1329 cx.update(|cx| cx.set_http_client(client));
1330 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1331 let store = cx.new(|cx| {
1332 ContextServerStore::test_maintain_server_loop(
1333 None,
1334 registry.clone(),
1335 project.read(cx).worktree_store(),
1336 project.downgrade(),
1337 cx,
1338 )
1339 });
1340
1341 let _server_events = assert_server_events(
1342 &store,
1343 vec![
1344 (server_id.clone(), ContextServerStatus::Starting),
1345 (server_id.clone(), ContextServerStatus::Running),
1346 ],
1347 cx,
1348 );
1349 cx.run_until_parked();
1350 }
1351
1352 struct ServerEvents {
1353 received_event_count: Rc<RefCell<usize>>,
1354 expected_event_count: usize,
1355 _subscription: Subscription,
1356 }
1357
1358 impl Drop for ServerEvents {
1359 fn drop(&mut self) {
1360 let actual_event_count = *self.received_event_count.borrow();
1361 assert_eq!(
1362 actual_event_count, self.expected_event_count,
1363 "
1364 Expected to receive {} context server store events, but received {} events",
1365 self.expected_event_count, actual_event_count
1366 );
1367 }
1368 }
1369
1370 #[gpui::test]
1371 async fn test_context_server_global_timeout(cx: &mut TestAppContext) {
1372 // Configure global timeout to 90 seconds
1373 cx.update(|cx| {
1374 let settings_store = SettingsStore::test(cx);
1375 cx.set_global(settings_store);
1376 SettingsStore::update_global(cx, |store, cx| {
1377 store
1378 .set_user_settings(r#"{"context_server_timeout": 90000}"#, cx)
1379 .expect("Failed to set test user settings");
1380 });
1381 });
1382
1383 let (_fs, project) = setup_context_server_test(cx, json!({"code.rs": ""}), vec![]).await;
1384
1385 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1386 let store = cx.new(|cx| {
1387 ContextServerStore::test(
1388 registry.clone(),
1389 project.read(cx).worktree_store(),
1390 project.downgrade(),
1391 cx,
1392 )
1393 });
1394
1395 // Test that create_context_server applies global timeout
1396 let result = store.update(cx, |store, cx| {
1397 store.create_context_server(
1398 ContextServerId("test-server".into()),
1399 Arc::new(ContextServerConfiguration::Http {
1400 url: url::Url::parse("http://localhost:8080")
1401 .expect("Failed to parse test URL"),
1402 headers: Default::default(),
1403 timeout: None, // Should use global timeout of 90 seconds
1404 }),
1405 cx,
1406 )
1407 });
1408
1409 assert!(
1410 result.is_ok(),
1411 "Server should be created successfully with global timeout"
1412 );
1413 }
1414
1415 #[gpui::test]
1416 async fn test_context_server_per_server_timeout_override(cx: &mut TestAppContext) {
1417 const SERVER_ID: &str = "test-server";
1418
1419 // Configure global timeout to 60 seconds
1420 cx.update(|cx| {
1421 let settings_store = SettingsStore::test(cx);
1422 cx.set_global(settings_store);
1423 SettingsStore::update_global(cx, |store, cx| {
1424 store
1425 .set_user_settings(r#"{"context_server_timeout": 60000}"#, cx)
1426 .expect("Failed to set test user settings");
1427 });
1428 });
1429
1430 let (_fs, project) = setup_context_server_test(
1431 cx,
1432 json!({"code.rs": ""}),
1433 vec![(
1434 SERVER_ID.into(),
1435 ContextServerSettings::Http {
1436 enabled: true,
1437 url: "http://localhost:8080".to_string(),
1438 headers: Default::default(),
1439 timeout: Some(120000), // Override to 120 seconds
1440 },
1441 )],
1442 )
1443 .await;
1444
1445 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1446 let store = cx.new(|cx| {
1447 ContextServerStore::test(
1448 registry.clone(),
1449 project.read(cx).worktree_store(),
1450 project.downgrade(),
1451 cx,
1452 )
1453 });
1454
1455 // Test that create_context_server applies per-server timeout override
1456 let result = store.update(cx, |store, cx| {
1457 store.create_context_server(
1458 ContextServerId("test-server".into()),
1459 Arc::new(ContextServerConfiguration::Http {
1460 url: url::Url::parse("http://localhost:8080")
1461 .expect("Failed to parse test URL"),
1462 headers: Default::default(),
1463 timeout: Some(120000), // Override: should use 120 seconds, not global 60
1464 }),
1465 cx,
1466 )
1467 });
1468
1469 assert!(
1470 result.is_ok(),
1471 "Server should be created successfully with per-server timeout override"
1472 );
1473 }
1474
1475 #[gpui::test]
1476 async fn test_context_server_stdio_timeout(cx: &mut TestAppContext) {
1477 const SERVER_ID: &str = "stdio-server";
1478
1479 let (_fs, project) = setup_context_server_test(
1480 cx,
1481 json!({"code.rs": ""}),
1482 vec![(
1483 SERVER_ID.into(),
1484 ContextServerSettings::Stdio {
1485 enabled: true,
1486 command: ContextServerCommand {
1487 path: "/usr/bin/node".into(),
1488 args: vec!["server.js".into()],
1489 env: None,
1490 timeout: Some(180000), // 3 minutes
1491 },
1492 },
1493 )],
1494 )
1495 .await;
1496
1497 let registry = cx.new(|_| ContextServerDescriptorRegistry::new());
1498 let store = cx.new(|cx| {
1499 ContextServerStore::test(
1500 registry.clone(),
1501 project.read(cx).worktree_store(),
1502 project.downgrade(),
1503 cx,
1504 )
1505 });
1506
1507 // Test that create_context_server works with stdio timeout
1508 let result = store.update(cx, |store, cx| {
1509 store.create_context_server(
1510 ContextServerId("stdio-server".into()),
1511 Arc::new(ContextServerConfiguration::Custom {
1512 command: ContextServerCommand {
1513 path: "/usr/bin/node".into(),
1514 args: vec!["server.js".into()],
1515 env: None,
1516 timeout: Some(180000), // 3 minutes
1517 },
1518 }),
1519 cx,
1520 )
1521 });
1522
1523 assert!(
1524 result.is_ok(),
1525 "Stdio server should be created successfully with timeout"
1526 );
1527 }
1528
1529 fn dummy_server_settings() -> ContextServerSettings {
1530 ContextServerSettings::Stdio {
1531 enabled: true,
1532 command: ContextServerCommand {
1533 path: "somebinary".into(),
1534 args: vec!["arg".to_string()],
1535 env: None,
1536 timeout: None,
1537 },
1538 }
1539 }
1540
1541 fn assert_server_events(
1542 store: &Entity<ContextServerStore>,
1543 expected_events: Vec<(ContextServerId, ContextServerStatus)>,
1544 cx: &mut TestAppContext,
1545 ) -> ServerEvents {
1546 cx.update(|cx| {
1547 let mut ix = 0;
1548 let received_event_count = Rc::new(RefCell::new(0));
1549 let expected_event_count = expected_events.len();
1550 let subscription = cx.subscribe(store, {
1551 let received_event_count = received_event_count.clone();
1552 move |_, event, _| match event {
1553 Event::ServerStatusChanged {
1554 server_id: actual_server_id,
1555 status: actual_status,
1556 } => {
1557 let (expected_server_id, expected_status) = &expected_events[ix];
1558
1559 assert_eq!(
1560 actual_server_id, expected_server_id,
1561 "Expected different server id at index {}",
1562 ix
1563 );
1564 assert_eq!(
1565 actual_status, expected_status,
1566 "Expected different status at index {}",
1567 ix
1568 );
1569 ix += 1;
1570 *received_event_count.borrow_mut() += 1;
1571 }
1572 }
1573 });
1574 ServerEvents {
1575 expected_event_count,
1576 received_event_count,
1577 _subscription: subscription,
1578 }
1579 })
1580 }
1581
1582 async fn setup_context_server_test(
1583 cx: &mut TestAppContext,
1584 files: serde_json::Value,
1585 context_server_configurations: Vec<(Arc<str>, ContextServerSettings)>,
1586 ) -> (Arc<FakeFs>, Entity<Project>) {
1587 cx.update(|cx| {
1588 let settings_store = SettingsStore::test(cx);
1589 cx.set_global(settings_store);
1590 let mut settings = ProjectSettings::get_global(cx).clone();
1591 for (id, config) in context_server_configurations {
1592 settings.context_servers.insert(id, config);
1593 }
1594 ProjectSettings::override_global(settings, cx);
1595 });
1596
1597 let fs = FakeFs::new(cx.executor());
1598 fs.insert_tree(path!("/test"), files).await;
1599 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
1600
1601 (fs, project)
1602 }
1603
1604 struct FakeContextServerDescriptor {
1605 path: PathBuf,
1606 }
1607
1608 impl FakeContextServerDescriptor {
1609 fn new(path: impl Into<PathBuf>) -> Self {
1610 Self { path: path.into() }
1611 }
1612 }
1613
1614 impl ContextServerDescriptor for FakeContextServerDescriptor {
1615 fn command(
1616 &self,
1617 _worktree_store: Entity<WorktreeStore>,
1618 _cx: &AsyncApp,
1619 ) -> Task<Result<ContextServerCommand>> {
1620 Task::ready(Ok(ContextServerCommand {
1621 path: self.path.clone(),
1622 args: vec!["arg1".to_string(), "arg2".to_string()],
1623 env: None,
1624 timeout: None,
1625 }))
1626 }
1627
1628 fn configuration(
1629 &self,
1630 _worktree_store: Entity<WorktreeStore>,
1631 _cx: &AsyncApp,
1632 ) -> Task<Result<Option<::extension::ContextServerConfiguration>>> {
1633 Task::ready(Ok(None))
1634 }
1635 }
1636}