1use anyhow::{anyhow, Result};
2use client::DevServerProjectId;
3use client::{user::UserStore, Client, ClientSettings};
4use fs::Fs;
5use futures::Future;
6use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
7use language::LanguageRegistry;
8use node_runtime::NodeRuntime;
9use postage::stream::Stream;
10use project::Project;
11use rpc::{proto, ErrorCode, TypedEnvelope};
12use settings::Settings;
13use std::{collections::HashMap, sync::Arc};
14use util::{ResultExt, TryFutureExt};
15
16pub struct DevServer {
17 client: Arc<Client>,
18 app_state: AppState,
19 remote_shutdown: bool,
20 projects: HashMap<DevServerProjectId, Model<Project>>,
21 _subscriptions: Vec<client::Subscription>,
22 _maintain_connection: Task<Option<()>>,
23}
24
25pub struct AppState {
26 pub node_runtime: Arc<dyn NodeRuntime>,
27 pub user_store: Model<UserStore>,
28 pub languages: Arc<LanguageRegistry>,
29 pub fs: Arc<dyn Fs>,
30}
31
32struct GlobalDevServer(Model<DevServer>);
33
34impl Global for GlobalDevServer {}
35
36pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
37 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
38 cx.set_global(GlobalDevServer(dev_server.clone()));
39
40 #[cfg(not(target_os = "windows"))]
41 {
42 use signal_hook::consts::{SIGINT, SIGTERM};
43 use signal_hook::iterator::Signals;
44 // Set up a handler when the dev server is shut down
45 // with ctrl-c or kill
46 let (tx, rx) = futures::channel::oneshot::channel();
47 let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
48 std::thread::spawn({
49 move || {
50 if let Some(sig) = signals.forever().next() {
51 tx.send(sig).log_err();
52 }
53 }
54 });
55 cx.spawn(|cx| async move {
56 if let Ok(sig) = rx.await {
57 log::info!("received signal {sig:?}");
58 cx.update(|cx| cx.quit()).log_err();
59 }
60 })
61 .detach();
62 }
63
64 let server_url = ClientSettings::get_global(&cx).server_url.clone();
65 cx.spawn(|cx| async move {
66 client
67 .authenticate_and_connect(false, &cx)
68 .await
69 .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
70 })
71}
72
73impl DevServer {
74 pub fn global(cx: &AppContext) -> Model<DevServer> {
75 cx.global::<GlobalDevServer>().0.clone()
76 }
77
78 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
79 cx.on_app_quit(Self::app_will_quit).detach();
80
81 let maintain_connection = cx.spawn({
82 let client = client.clone();
83 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
84 });
85
86 DevServer {
87 _subscriptions: vec![
88 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
89 client.add_request_handler(
90 cx.weak_model(),
91 Self::handle_validate_dev_server_project_request,
92 ),
93 client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
94 ],
95 _maintain_connection: maintain_connection,
96 projects: Default::default(),
97 remote_shutdown: false,
98 app_state,
99 client,
100 }
101 }
102
103 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
104 let request = if self.remote_shutdown {
105 None
106 } else {
107 Some(
108 self.client
109 .request(proto::ShutdownDevServer { reason: None }),
110 )
111 };
112 async move {
113 if let Some(request) = request {
114 request.await.log_err();
115 }
116 }
117 }
118
119 async fn handle_dev_server_instructions(
120 this: Model<Self>,
121 envelope: TypedEnvelope<proto::DevServerInstructions>,
122 _: Arc<Client>,
123 mut cx: AsyncAppContext,
124 ) -> Result<()> {
125 let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
126 let removed_projects = this
127 .projects
128 .keys()
129 .filter(|dev_server_project_id| {
130 !envelope
131 .payload
132 .projects
133 .iter()
134 .any(|p| p.id == dev_server_project_id.0)
135 })
136 .cloned()
137 .collect::<Vec<_>>();
138
139 let added_projects = envelope
140 .payload
141 .projects
142 .into_iter()
143 .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
144 .collect::<Vec<_>>();
145
146 (added_projects, removed_projects)
147 })?;
148
149 for dev_server_project in added_projects {
150 DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
151 }
152
153 this.update(&mut cx, |this, cx| {
154 for old_project_id in &removed_projects_ids {
155 this.unshare_project(old_project_id, cx)?;
156 }
157 Ok::<(), anyhow::Error>(())
158 })??;
159 Ok(())
160 }
161
162 async fn handle_validate_dev_server_project_request(
163 this: Model<Self>,
164 envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
165 _: Arc<Client>,
166 cx: AsyncAppContext,
167 ) -> Result<proto::Ack> {
168 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
169 let path = std::path::Path::new(&expanded);
170 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
171
172 let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
173 if !path_exists {
174 return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
175 }
176
177 Ok(proto::Ack {})
178 }
179
180 async fn handle_shutdown(
181 this: Model<Self>,
182 _envelope: TypedEnvelope<proto::ShutdownDevServer>,
183 _: Arc<Client>,
184 mut cx: AsyncAppContext,
185 ) -> Result<()> {
186 this.update(&mut cx, |this, cx| {
187 this.remote_shutdown = true;
188 cx.quit();
189 })
190 }
191
192 fn unshare_project(
193 &mut self,
194 dev_server_project_id: &DevServerProjectId,
195 cx: &mut ModelContext<Self>,
196 ) -> Result<()> {
197 if let Some(project) = self.projects.remove(dev_server_project_id) {
198 project.update(cx, |project, cx| project.unshare(cx))?;
199 }
200 Ok(())
201 }
202
203 async fn share_project(
204 this: Model<Self>,
205 dev_server_project: &proto::DevServerProject,
206 cx: &mut AsyncAppContext,
207 ) -> Result<()> {
208 let (client, project) = this.update(cx, |this, cx| {
209 let project = Project::local(
210 this.client.clone(),
211 this.app_state.node_runtime.clone(),
212 this.app_state.user_store.clone(),
213 this.app_state.languages.clone(),
214 this.app_state.fs.clone(),
215 cx,
216 );
217
218 (this.client.clone(), project)
219 })?;
220
221 let path = shellexpand::tilde(&dev_server_project.path).to_string();
222
223 let (worktree, _) = project
224 .update(cx, |project, cx| {
225 project.find_or_create_local_worktree(&path, true, cx)
226 })?
227 .await?;
228
229 worktree.update(cx, |worktree, cx| {
230 worktree.as_local_mut().unwrap().share_private_files(cx)
231 })?;
232
233 let worktrees =
234 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
235
236 let response = client
237 .request(proto::ShareDevServerProject {
238 dev_server_project_id: dev_server_project.id,
239 worktrees,
240 })
241 .await?;
242
243 let project_id = response.project_id;
244 project.update(cx, |project, cx| project.shared(project_id, cx))??;
245 this.update(cx, |this, _| {
246 this.projects
247 .insert(DevServerProjectId(dev_server_project.id), project);
248 })?;
249 Ok(())
250 }
251
252 async fn maintain_connection(
253 this: WeakModel<Self>,
254 client: Arc<Client>,
255 mut cx: AsyncAppContext,
256 ) -> Result<()> {
257 let mut client_status = client.status();
258
259 let _ = client_status.try_recv();
260 let current_status = *client_status.borrow();
261 if current_status.is_connected() {
262 // wait for first disconnect
263 client_status.recv().await;
264 }
265
266 loop {
267 let Some(current_status) = client_status.recv().await else {
268 return Ok(());
269 };
270 let Some(this) = this.upgrade() else {
271 return Ok(());
272 };
273
274 if !current_status.is_connected() {
275 continue;
276 }
277
278 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
279 }
280 }
281
282 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
283 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
284 let request = self.client.request(proto::ReconnectDevServer {
285 reshared_projects: self
286 .projects
287 .iter()
288 .flat_map(|(_, handle)| {
289 let project = handle.read(cx);
290 let project_id = project.remote_id()?;
291 projects.insert(project_id, handle.clone());
292 Some(proto::UpdateProject {
293 project_id,
294 worktrees: project.worktree_metadata_protos(cx),
295 })
296 })
297 .collect(),
298 });
299 cx.spawn(|_, mut cx| async move {
300 let response = request.await?;
301
302 for reshared_project in response.reshared_projects {
303 if let Some(project) = projects.get(&reshared_project.id) {
304 project.update(&mut cx, |project, cx| {
305 project.reshared(reshared_project, cx).log_err();
306 })?;
307 }
308 }
309 Ok(())
310 })
311 }
312}