1use anyhow::{anyhow, Result};
2use client::DevServerProjectId;
3use client::{user::UserStore, Client, ClientSettings};
4use extension::ExtensionStore;
5use fs::Fs;
6use futures::{Future, StreamExt};
7use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
8use language::LanguageRegistry;
9use node_runtime::NodeRuntime;
10use postage::stream::Stream;
11use project::Project;
12use rpc::{proto, ErrorCode, TypedEnvelope};
13use settings::{Settings, SettingsStore};
14use std::path::Path;
15use std::{collections::HashMap, sync::Arc};
16use util::{ResultExt, TryFutureExt};
17
18pub struct DevServer {
19 client: Arc<Client>,
20 app_state: AppState,
21 remote_shutdown: bool,
22 projects: HashMap<DevServerProjectId, Model<Project>>,
23 _subscriptions: Vec<client::Subscription>,
24 _maintain_connection: Task<Option<()>>,
25}
26
27pub struct AppState {
28 pub node_runtime: Arc<dyn NodeRuntime>,
29 pub user_store: Model<UserStore>,
30 pub languages: Arc<LanguageRegistry>,
31 pub fs: Arc<dyn Fs>,
32}
33
34struct GlobalDevServer(Model<DevServer>);
35
36impl Global for GlobalDevServer {}
37
38pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
39 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
40 cx.set_global(GlobalDevServer(dev_server.clone()));
41
42 #[cfg(not(target_os = "windows"))]
43 {
44 use signal_hook::consts::{SIGINT, SIGTERM};
45 use signal_hook::iterator::Signals;
46 // Set up a handler when the dev server is shut down
47 // with ctrl-c or kill
48 let (tx, rx) = futures::channel::oneshot::channel();
49 let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
50 std::thread::spawn({
51 move || {
52 if let Some(sig) = signals.forever().next() {
53 tx.send(sig).log_err();
54 }
55 }
56 });
57 cx.spawn(|cx| async move {
58 if let Ok(sig) = rx.await {
59 log::info!("received signal {sig:?}");
60 cx.update(|cx| cx.quit()).log_err();
61 }
62 })
63 .detach();
64 }
65
66 let server_url = ClientSettings::get_global(&cx).server_url.clone();
67 cx.spawn(|cx| async move {
68 client
69 .authenticate_and_connect(false, &cx)
70 .await
71 .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
72 })
73}
74
75impl DevServer {
76 pub fn global(cx: &AppContext) -> Model<DevServer> {
77 cx.global::<GlobalDevServer>().0.clone()
78 }
79
80 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
81 cx.on_app_quit(Self::app_will_quit).detach();
82
83 let maintain_connection = cx.spawn({
84 let client = client.clone();
85 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
86 });
87
88 cx.observe_global::<SettingsStore>(|_, cx| {
89 ExtensionStore::global(cx).update(cx, |store, cx| store.auto_install_extensions(cx))
90 })
91 .detach();
92
93 DevServer {
94 _subscriptions: vec![
95 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
96 client.add_request_handler(
97 cx.weak_model(),
98 Self::handle_validate_dev_server_project_request,
99 ),
100 client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory),
101 client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
102 ],
103 _maintain_connection: maintain_connection,
104 projects: Default::default(),
105 remote_shutdown: false,
106 app_state,
107 client,
108 }
109 }
110
111 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
112 let request = if self.remote_shutdown {
113 None
114 } else {
115 Some(
116 self.client
117 .request(proto::ShutdownDevServer { reason: None }),
118 )
119 };
120 async move {
121 if let Some(request) = request {
122 request.await.log_err();
123 }
124 }
125 }
126
127 async fn handle_dev_server_instructions(
128 this: Model<Self>,
129 envelope: TypedEnvelope<proto::DevServerInstructions>,
130 mut cx: AsyncAppContext,
131 ) -> Result<()> {
132 let (added_projects, retained_projects, removed_projects_ids) =
133 this.read_with(&mut cx, |this, _| {
134 let removed_projects = this
135 .projects
136 .keys()
137 .filter(|dev_server_project_id| {
138 !envelope
139 .payload
140 .projects
141 .iter()
142 .any(|p| p.id == dev_server_project_id.0)
143 })
144 .cloned()
145 .collect::<Vec<_>>();
146
147 let mut added_projects = vec![];
148 let mut retained_projects = vec![];
149
150 for project in envelope.payload.projects.iter() {
151 if this.projects.contains_key(&DevServerProjectId(project.id)) {
152 retained_projects.push(project.clone());
153 } else {
154 added_projects.push(project.clone());
155 }
156 }
157
158 (added_projects, retained_projects, removed_projects)
159 })?;
160
161 for dev_server_project in added_projects {
162 DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
163 }
164
165 for dev_server_project in retained_projects {
166 DevServer::update_project(this.clone(), &dev_server_project, &mut cx).await?;
167 }
168
169 this.update(&mut cx, |this, cx| {
170 for old_project_id in &removed_projects_ids {
171 this.unshare_project(old_project_id, cx)?;
172 }
173 Ok::<(), anyhow::Error>(())
174 })??;
175 Ok(())
176 }
177
178 async fn handle_validate_dev_server_project_request(
179 this: Model<Self>,
180 envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
181 cx: AsyncAppContext,
182 ) -> Result<proto::Ack> {
183 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
184 let path = std::path::Path::new(&expanded);
185 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
186
187 let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
188 if !path_exists {
189 return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
190 }
191
192 Ok(proto::Ack {})
193 }
194
195 async fn handle_list_remote_directory(
196 this: Model<Self>,
197 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
198 cx: AsyncAppContext,
199 ) -> Result<proto::ListRemoteDirectoryResponse> {
200 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
201 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
202
203 let mut entries = Vec::new();
204 let mut response = fs.read_dir(Path::new(&expanded)).await?;
205 while let Some(path) = response.next().await {
206 if let Some(file_name) = path?.file_name() {
207 entries.push(file_name.to_string_lossy().to_string());
208 }
209 }
210 Ok(proto::ListRemoteDirectoryResponse { entries })
211 }
212
213 async fn handle_shutdown(
214 this: Model<Self>,
215 _envelope: TypedEnvelope<proto::ShutdownDevServer>,
216 mut cx: AsyncAppContext,
217 ) -> Result<()> {
218 this.update(&mut cx, |this, cx| {
219 this.remote_shutdown = true;
220 cx.quit();
221 })
222 }
223
224 fn unshare_project(
225 &mut self,
226 dev_server_project_id: &DevServerProjectId,
227 cx: &mut ModelContext<Self>,
228 ) -> Result<()> {
229 if let Some(project) = self.projects.remove(dev_server_project_id) {
230 project.update(cx, |project, cx| project.unshare(cx))?;
231 }
232 Ok(())
233 }
234
235 async fn share_project(
236 this: Model<Self>,
237 dev_server_project: &proto::DevServerProject,
238 cx: &mut AsyncAppContext,
239 ) -> Result<()> {
240 let (client, project) = this.update(cx, |this, cx| {
241 let project = Project::local(
242 this.client.clone(),
243 this.app_state.node_runtime.clone(),
244 this.app_state.user_store.clone(),
245 this.app_state.languages.clone(),
246 this.app_state.fs.clone(),
247 cx,
248 );
249
250 (this.client.clone(), project)
251 })?;
252
253 for path in &dev_server_project.paths {
254 let path = shellexpand::tilde(path).to_string();
255
256 let (worktree, _) = project
257 .update(cx, |project, cx| {
258 project.find_or_create_worktree(&path, true, cx)
259 })?
260 .await?;
261
262 worktree.update(cx, |worktree, cx| {
263 worktree.as_local_mut().unwrap().share_private_files(cx)
264 })?;
265 }
266
267 let worktrees =
268 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
269
270 let response = client
271 .request(proto::ShareDevServerProject {
272 dev_server_project_id: dev_server_project.id,
273 worktrees,
274 })
275 .await?;
276
277 let project_id = response.project_id;
278 project.update(cx, |project, cx| project.shared(project_id, cx))??;
279 this.update(cx, |this, _| {
280 this.projects
281 .insert(DevServerProjectId(dev_server_project.id), project);
282 })?;
283 Ok(())
284 }
285
286 async fn update_project(
287 this: Model<Self>,
288 dev_server_project: &proto::DevServerProject,
289 cx: &mut AsyncAppContext,
290 ) -> Result<()> {
291 let tasks = this.update(cx, |this, cx| {
292 let Some(project) = this
293 .projects
294 .get(&DevServerProjectId(dev_server_project.id))
295 else {
296 return vec![];
297 };
298
299 let mut to_delete = vec![];
300 let mut tasks = vec![];
301
302 project.update(cx, |project, cx| {
303 for worktree in project.visible_worktrees(cx) {
304 let mut delete = true;
305 for config in dev_server_project.paths.iter() {
306 if worktree.read(cx).abs_path().to_string_lossy()
307 == shellexpand::tilde(config)
308 {
309 delete = false;
310 }
311 }
312 if delete {
313 to_delete.push(worktree.read(cx).id())
314 }
315 }
316
317 for worktree_id in to_delete {
318 project.remove_worktree(worktree_id, cx)
319 }
320
321 for config in dev_server_project.paths.iter() {
322 tasks.push(project.find_or_create_worktree(
323 &shellexpand::tilde(config).to_string(),
324 true,
325 cx,
326 ));
327 }
328
329 tasks
330 })
331 })?;
332 futures::future::join_all(tasks).await;
333 Ok(())
334 }
335
336 async fn maintain_connection(
337 this: WeakModel<Self>,
338 client: Arc<Client>,
339 mut cx: AsyncAppContext,
340 ) -> Result<()> {
341 let mut client_status = client.status();
342
343 let _ = client_status.try_recv();
344 let current_status = *client_status.borrow();
345 if current_status.is_connected() {
346 // wait for first disconnect
347 client_status.recv().await;
348 }
349
350 loop {
351 let Some(current_status) = client_status.recv().await else {
352 return Ok(());
353 };
354 let Some(this) = this.upgrade() else {
355 return Ok(());
356 };
357
358 if !current_status.is_connected() {
359 continue;
360 }
361
362 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
363 }
364 }
365
366 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
367 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
368 let request = self.client.request(proto::ReconnectDevServer {
369 reshared_projects: self
370 .projects
371 .iter()
372 .flat_map(|(_, handle)| {
373 let project = handle.read(cx);
374 let project_id = project.remote_id()?;
375 projects.insert(project_id, handle.clone());
376 Some(proto::UpdateProject {
377 project_id,
378 worktrees: project.worktree_metadata_protos(cx),
379 })
380 })
381 .collect(),
382 });
383 cx.spawn(|_, mut cx| async move {
384 let response = request.await?;
385
386 for reshared_project in response.reshared_projects {
387 if let Some(project) = projects.get(&reshared_project.id) {
388 project.update(&mut cx, |project, cx| {
389 project.reshared(reshared_project, cx).log_err();
390 })?;
391 }
392 }
393 Ok(())
394 })
395 }
396}