manager.rs

  1//! This module implements a context server management system for Zed.
  2//!
  3//! It provides functionality to:
  4//! - Define and load context server settings
  5//! - Manage individual context servers (start, stop, restart)
  6//! - Maintain a global manager for all context servers
  7//!
  8//! Key components:
  9//! - `ContextServerSettings`: Defines the structure for server configurations
 10//! - `ContextServer`: Represents an individual context server
 11//! - `ContextServerManager`: Manages multiple context servers
 12//! - `GlobalContextServerManager`: Provides global access to the ContextServerManager
 13//!
 14//! The module also includes initialization logic to set up the context server system
 15//! and react to changes in settings.
 16
 17use collections::{HashMap, HashSet};
 18use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, Task};
 19use log;
 20use parking_lot::RwLock;
 21use schemars::JsonSchema;
 22use serde::{Deserialize, Serialize};
 23use settings::{Settings, SettingsSources, SettingsStore};
 24use std::path::Path;
 25use std::sync::Arc;
 26
 27use crate::{
 28    client::{self, Client},
 29    types,
 30};
 31
 32#[derive(Deserialize, Serialize, Default, Clone, PartialEq, Eq, JsonSchema, Debug)]
 33pub struct ContextServerSettings {
 34    pub servers: Vec<ServerConfig>,
 35}
 36
 37#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, JsonSchema, Debug)]
 38pub struct ServerConfig {
 39    pub id: String,
 40    pub executable: String,
 41    pub args: Vec<String>,
 42    pub env: Option<HashMap<String, String>>,
 43}
 44
 45impl Settings for ContextServerSettings {
 46    const KEY: Option<&'static str> = Some("experimental.context_servers");
 47
 48    type FileContent = Self;
 49
 50    fn load(
 51        sources: SettingsSources<Self::FileContent>,
 52        _: &mut gpui::AppContext,
 53    ) -> anyhow::Result<Self> {
 54        sources.json_merge()
 55    }
 56}
 57
 58pub struct ContextServer {
 59    pub id: String,
 60    pub config: ServerConfig,
 61    pub client: RwLock<Option<Arc<crate::protocol::InitializedContextServerProtocol>>>,
 62}
 63
 64impl ContextServer {
 65    fn new(config: ServerConfig) -> Self {
 66        Self {
 67            id: config.id.clone(),
 68            config,
 69            client: RwLock::new(None),
 70        }
 71    }
 72
 73    async fn start(&self, cx: &AsyncAppContext) -> anyhow::Result<()> {
 74        log::info!("starting context server {}", self.config.id,);
 75        let client = Client::new(
 76            client::ContextServerId(self.config.id.clone()),
 77            client::ModelContextServerBinary {
 78                executable: Path::new(&self.config.executable).to_path_buf(),
 79                args: self.config.args.clone(),
 80                env: self.config.env.clone(),
 81            },
 82            cx.clone(),
 83        )?;
 84
 85        let protocol = crate::protocol::ModelContextProtocol::new(client);
 86        let client_info = types::EntityInfo {
 87            name: "Zed".to_string(),
 88            version: env!("CARGO_PKG_VERSION").to_string(),
 89        };
 90        let initialized_protocol = protocol.initialize(client_info).await?;
 91
 92        log::debug!(
 93            "context server {} initialized: {:?}",
 94            self.config.id,
 95            initialized_protocol.initialize,
 96        );
 97
 98        *self.client.write() = Some(Arc::new(initialized_protocol));
 99        Ok(())
100    }
101
102    async fn stop(&self) -> anyhow::Result<()> {
103        let mut client = self.client.write();
104        if let Some(protocol) = client.take() {
105            drop(protocol);
106        }
107        Ok(())
108    }
109}
110
111/// A Context server manager manages the starting and stopping
112/// of all servers. To obtain a server to interact with, a crate
113/// must go through the `GlobalContextServerManager` which holds
114/// a model to the ContextServerManager.
115pub struct ContextServerManager {
116    servers: HashMap<String, Arc<ContextServer>>,
117    pending_servers: HashSet<String>,
118}
119
120pub enum Event {
121    ServerStarted { server_id: String },
122    ServerStopped { server_id: String },
123}
124
125impl Global for ContextServerManager {}
126impl EventEmitter<Event> for ContextServerManager {}
127
128impl ContextServerManager {
129    pub fn new() -> Self {
130        Self {
131            servers: HashMap::default(),
132            pending_servers: HashSet::default(),
133        }
134    }
135    pub fn global(cx: &AppContext) -> Model<Self> {
136        cx.global::<GlobalContextServerManager>().0.clone()
137    }
138
139    pub fn add_server(
140        &mut self,
141        config: ServerConfig,
142        cx: &mut ModelContext<Self>,
143    ) -> Task<anyhow::Result<()>> {
144        let server_id = config.id.clone();
145        let server_id2 = config.id.clone();
146
147        if self.servers.contains_key(&server_id) || self.pending_servers.contains(&server_id) {
148            return Task::ready(Ok(()));
149        }
150
151        let task = cx.spawn(|this, mut cx| async move {
152            let server = Arc::new(ContextServer::new(config));
153            server.start(&cx).await?;
154            this.update(&mut cx, |this, cx| {
155                this.servers.insert(server_id.clone(), server);
156                this.pending_servers.remove(&server_id);
157                cx.emit(Event::ServerStarted {
158                    server_id: server_id.clone(),
159                });
160            })?;
161            Ok(())
162        });
163
164        self.pending_servers.insert(server_id2);
165        task
166    }
167
168    pub fn get_server(&self, id: &str) -> Option<Arc<ContextServer>> {
169        self.servers.get(id).cloned()
170    }
171
172    pub fn remove_server(
173        &mut self,
174        id: &str,
175        cx: &mut ModelContext<Self>,
176    ) -> Task<anyhow::Result<()>> {
177        let id = id.to_string();
178        cx.spawn(|this, mut cx| async move {
179            if let Some(server) = this.update(&mut cx, |this, _cx| this.servers.remove(&id))? {
180                server.stop().await?;
181            }
182            this.update(&mut cx, |this, cx| {
183                this.pending_servers.remove(&id);
184                cx.emit(Event::ServerStopped {
185                    server_id: id.clone(),
186                })
187            })?;
188            Ok(())
189        })
190    }
191
192    pub fn restart_server(
193        &mut self,
194        id: &str,
195        cx: &mut ModelContext<Self>,
196    ) -> Task<anyhow::Result<()>> {
197        let id = id.to_string();
198        cx.spawn(|this, mut cx| async move {
199            if let Some(server) = this.update(&mut cx, |this, _cx| this.servers.remove(&id))? {
200                server.stop().await?;
201                let config = server.config.clone();
202                let new_server = Arc::new(ContextServer::new(config));
203                new_server.start(&cx).await?;
204                this.update(&mut cx, |this, cx| {
205                    this.servers.insert(id.clone(), new_server);
206                    cx.emit(Event::ServerStopped {
207                        server_id: id.clone(),
208                    });
209                    cx.emit(Event::ServerStarted {
210                        server_id: id.clone(),
211                    });
212                })?;
213            }
214            Ok(())
215        })
216    }
217
218    pub fn servers(&self) -> Vec<Arc<ContextServer>> {
219        self.servers.values().cloned().collect()
220    }
221
222    pub fn model(cx: &mut AppContext) -> Model<Self> {
223        cx.new_model(|_cx| ContextServerManager::new())
224    }
225}
226
227pub struct GlobalContextServerManager(Model<ContextServerManager>);
228impl Global for GlobalContextServerManager {}
229
230impl GlobalContextServerManager {
231    fn register(cx: &mut AppContext) {
232        let model = ContextServerManager::model(cx);
233        cx.set_global(Self(model));
234    }
235}
236
237pub fn init(cx: &mut AppContext) {
238    ContextServerSettings::register(cx);
239    GlobalContextServerManager::register(cx);
240    cx.observe_global::<SettingsStore>(|cx| {
241        let manager = ContextServerManager::global(cx);
242        cx.update_model(&manager, |manager, cx| {
243            let settings = ContextServerSettings::get_global(cx);
244            let current_servers: HashMap<String, ServerConfig> = manager
245                .servers()
246                .into_iter()
247                .map(|server| (server.id.clone(), server.config.clone()))
248                .collect();
249
250            let new_servers = settings
251                .servers
252                .iter()
253                .map(|config| (config.id.clone(), config.clone()))
254                .collect::<HashMap<_, _>>();
255
256            let servers_to_add = new_servers
257                .values()
258                .filter(|config| !current_servers.contains_key(&config.id))
259                .cloned()
260                .collect::<Vec<_>>();
261
262            let servers_to_remove = current_servers
263                .keys()
264                .filter(|id| !new_servers.contains_key(*id))
265                .cloned()
266                .collect::<Vec<_>>();
267
268            log::trace!("servers_to_add={:?}", servers_to_add);
269            for config in servers_to_add {
270                manager.add_server(config, cx).detach_and_log_err(cx);
271            }
272
273            for id in servers_to_remove {
274                manager.remove_server(&id, cx).detach_and_log_err(cx);
275            }
276        })
277    })
278    .detach();
279}