1use anyhow::{anyhow, Result};
2use client::DevServerProjectId;
3use client::{user::UserStore, Client, ClientSettings};
4use extension::ExtensionStore;
5use fs::Fs;
6use futures::{Future, StreamExt};
7use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
8use language::LanguageRegistry;
9use node_runtime::NodeRuntime;
10use postage::stream::Stream;
11use project::Project;
12use rpc::{proto, ErrorCode, TypedEnvelope};
13use settings::{Settings, SettingsStore};
14use std::path::Path;
15use std::{collections::HashMap, sync::Arc};
16use util::{ResultExt, TryFutureExt};
17
18pub struct DevServer {
19 client: Arc<Client>,
20 app_state: AppState,
21 remote_shutdown: bool,
22 projects: HashMap<DevServerProjectId, Model<Project>>,
23 _subscriptions: Vec<client::Subscription>,
24 _maintain_connection: Task<Option<()>>,
25}
26
27pub struct AppState {
28 pub node_runtime: NodeRuntime,
29 pub user_store: Model<UserStore>,
30 pub languages: Arc<LanguageRegistry>,
31 pub fs: Arc<dyn Fs>,
32}
33
34struct GlobalDevServer(Model<DevServer>);
35
36impl Global for GlobalDevServer {}
37
38pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
39 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
40 cx.set_global(GlobalDevServer(dev_server.clone()));
41
42 #[cfg(not(target_os = "windows"))]
43 {
44 use signal_hook::consts::{SIGINT, SIGTERM};
45 use signal_hook::iterator::Signals;
46 // Set up a handler when the dev server is shut down
47 // with ctrl-c or kill
48 let (tx, rx) = futures::channel::oneshot::channel();
49 let mut signals = Signals::new([SIGTERM, SIGINT]).unwrap();
50 std::thread::spawn({
51 move || {
52 if let Some(sig) = signals.forever().next() {
53 tx.send(sig).log_err();
54 }
55 }
56 });
57 cx.spawn(|cx| async move {
58 if let Ok(sig) = rx.await {
59 log::info!("received signal {sig:?}");
60 cx.update(|cx| cx.quit()).log_err();
61 }
62 })
63 .detach();
64 }
65
66 let server_url = ClientSettings::get_global(cx).server_url.clone();
67 cx.spawn(|cx| async move {
68 client
69 .authenticate_and_connect(false, &cx)
70 .await
71 .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
72 })
73}
74
75impl DevServer {
76 pub fn global(cx: &AppContext) -> Model<DevServer> {
77 cx.global::<GlobalDevServer>().0.clone()
78 }
79
80 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
81 cx.on_app_quit(Self::app_will_quit).detach();
82
83 let maintain_connection = cx.spawn({
84 let client = client.clone();
85 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
86 });
87
88 cx.observe_global::<SettingsStore>(|_, cx| {
89 ExtensionStore::global(cx).update(cx, |store, cx| store.auto_install_extensions(cx))
90 })
91 .detach();
92
93 DevServer {
94 _subscriptions: vec![
95 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
96 client.add_request_handler(
97 cx.weak_model(),
98 Self::handle_validate_dev_server_project_request,
99 ),
100 client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory),
101 client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
102 ],
103 _maintain_connection: maintain_connection,
104 projects: Default::default(),
105 remote_shutdown: false,
106 app_state,
107 client,
108 }
109 }
110
111 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
112 let request = if self.remote_shutdown {
113 None
114 } else {
115 Some(
116 self.client
117 .request(proto::ShutdownDevServer { reason: None }),
118 )
119 };
120 async move {
121 if let Some(request) = request {
122 request.await.log_err();
123 }
124 }
125 }
126
127 async fn handle_dev_server_instructions(
128 this: Model<Self>,
129 envelope: TypedEnvelope<proto::DevServerInstructions>,
130 mut cx: AsyncAppContext,
131 ) -> Result<()> {
132 let (added_projects, retained_projects, removed_projects_ids) =
133 this.read_with(&mut cx, |this, _| {
134 let removed_projects = this
135 .projects
136 .keys()
137 .filter(|dev_server_project_id| {
138 !envelope
139 .payload
140 .projects
141 .iter()
142 .any(|p| p.id == dev_server_project_id.0)
143 })
144 .cloned()
145 .collect::<Vec<_>>();
146
147 let mut added_projects = vec![];
148 let mut retained_projects = vec![];
149
150 for project in envelope.payload.projects.iter() {
151 if this.projects.contains_key(&DevServerProjectId(project.id)) {
152 retained_projects.push(project.clone());
153 } else {
154 added_projects.push(project.clone());
155 }
156 }
157
158 (added_projects, retained_projects, removed_projects)
159 })?;
160
161 for dev_server_project in added_projects {
162 DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
163 }
164
165 for dev_server_project in retained_projects {
166 DevServer::update_project(this.clone(), &dev_server_project, &mut cx).await?;
167 }
168
169 this.update(&mut cx, |this, cx| {
170 for old_project_id in &removed_projects_ids {
171 this.unshare_project(old_project_id, cx)?;
172 }
173 Ok::<(), anyhow::Error>(())
174 })??;
175 Ok(())
176 }
177
178 async fn handle_validate_dev_server_project_request(
179 this: Model<Self>,
180 envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
181 cx: AsyncAppContext,
182 ) -> Result<proto::Ack> {
183 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
184 let path = std::path::Path::new(&expanded);
185 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
186
187 let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
188 if !path_exists {
189 return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
190 }
191
192 Ok(proto::Ack {})
193 }
194
195 async fn handle_list_remote_directory(
196 this: Model<Self>,
197 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
198 cx: AsyncAppContext,
199 ) -> Result<proto::ListRemoteDirectoryResponse> {
200 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
201 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
202
203 let mut entries = Vec::new();
204 let mut response = fs.read_dir(Path::new(&expanded)).await?;
205 while let Some(path) = response.next().await {
206 if let Some(file_name) = path?.file_name() {
207 entries.push(file_name.to_string_lossy().to_string());
208 }
209 }
210 Ok(proto::ListRemoteDirectoryResponse { entries })
211 }
212
213 async fn handle_shutdown(
214 this: Model<Self>,
215 _envelope: TypedEnvelope<proto::ShutdownDevServer>,
216 mut cx: AsyncAppContext,
217 ) -> Result<()> {
218 this.update(&mut cx, |this, cx| {
219 this.remote_shutdown = true;
220 cx.quit();
221 })
222 }
223
224 fn unshare_project(
225 &mut self,
226 dev_server_project_id: &DevServerProjectId,
227 cx: &mut ModelContext<Self>,
228 ) -> Result<()> {
229 if let Some(project) = self.projects.remove(dev_server_project_id) {
230 project.update(cx, |project, cx| project.unshare(cx))?;
231 }
232 Ok(())
233 }
234
235 async fn share_project(
236 this: Model<Self>,
237 dev_server_project: &proto::DevServerProject,
238 cx: &mut AsyncAppContext,
239 ) -> Result<()> {
240 let (client, project) = this.update(cx, |this, cx| {
241 let project = Project::local(
242 this.client.clone(),
243 this.app_state.node_runtime.clone(),
244 this.app_state.user_store.clone(),
245 this.app_state.languages.clone(),
246 this.app_state.fs.clone(),
247 None,
248 cx,
249 );
250
251 (this.client.clone(), project)
252 })?;
253
254 for path in &dev_server_project.paths {
255 let path = shellexpand::tilde(path).to_string();
256
257 let (worktree, _) = project
258 .update(cx, |project, cx| {
259 project.find_or_create_worktree(&path, true, cx)
260 })?
261 .await?;
262
263 worktree.update(cx, |worktree, cx| {
264 worktree.as_local_mut().unwrap().share_private_files(cx)
265 })?;
266 }
267
268 let worktrees =
269 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
270
271 let response = client
272 .request(proto::ShareDevServerProject {
273 dev_server_project_id: dev_server_project.id,
274 worktrees,
275 })
276 .await?;
277
278 let project_id = response.project_id;
279 project.update(cx, |project, cx| project.shared(project_id, cx))??;
280 this.update(cx, |this, _| {
281 this.projects
282 .insert(DevServerProjectId(dev_server_project.id), project);
283 })?;
284 Ok(())
285 }
286
287 async fn update_project(
288 this: Model<Self>,
289 dev_server_project: &proto::DevServerProject,
290 cx: &mut AsyncAppContext,
291 ) -> Result<()> {
292 let tasks = this.update(cx, |this, cx| {
293 let Some(project) = this
294 .projects
295 .get(&DevServerProjectId(dev_server_project.id))
296 else {
297 return vec![];
298 };
299
300 let mut to_delete = vec![];
301 let mut tasks = vec![];
302
303 project.update(cx, |project, cx| {
304 for worktree in project.visible_worktrees(cx) {
305 let mut delete = true;
306 for config in dev_server_project.paths.iter() {
307 if worktree.read(cx).abs_path().to_string_lossy()
308 == shellexpand::tilde(config)
309 {
310 delete = false;
311 }
312 }
313 if delete {
314 to_delete.push(worktree.read(cx).id())
315 }
316 }
317
318 for worktree_id in to_delete {
319 project.remove_worktree(worktree_id, cx)
320 }
321
322 for config in dev_server_project.paths.iter() {
323 tasks.push(project.find_or_create_worktree(
324 shellexpand::tilde(config).to_string(),
325 true,
326 cx,
327 ));
328 }
329
330 tasks
331 })
332 })?;
333 futures::future::join_all(tasks).await;
334 Ok(())
335 }
336
337 async fn maintain_connection(
338 this: WeakModel<Self>,
339 client: Arc<Client>,
340 mut cx: AsyncAppContext,
341 ) -> Result<()> {
342 let mut client_status = client.status();
343
344 let _ = client_status.try_recv();
345 let current_status = *client_status.borrow();
346 if current_status.is_connected() {
347 // wait for first disconnect
348 client_status.recv().await;
349 }
350
351 loop {
352 let Some(current_status) = client_status.recv().await else {
353 return Ok(());
354 };
355 let Some(this) = this.upgrade() else {
356 return Ok(());
357 };
358
359 if !current_status.is_connected() {
360 continue;
361 }
362
363 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
364 }
365 }
366
367 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
368 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
369 let request = self.client.request(proto::ReconnectDevServer {
370 reshared_projects: self
371 .projects
372 .iter()
373 .flat_map(|(_, handle)| {
374 let project = handle.read(cx);
375 let project_id = project.remote_id()?;
376 projects.insert(project_id, handle.clone());
377 Some(proto::UpdateProject {
378 project_id,
379 worktrees: project.worktree_metadata_protos(cx),
380 })
381 })
382 .collect(),
383 });
384 cx.spawn(|_, mut cx| async move {
385 let response = request.await?;
386
387 for reshared_project in response.reshared_projects {
388 if let Some(project) = projects.get(&reshared_project.id) {
389 project.update(&mut cx, |project, cx| {
390 project.reshared(reshared_project, cx).log_err();
391 })?;
392 }
393 }
394 Ok(())
395 })
396 }
397}