1use anyhow::{anyhow, Result};
2use client::DevServerProjectId;
3use client::{user::UserStore, Client, ClientSettings};
4use fs::Fs;
5use futures::Future;
6use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
7use language::LanguageRegistry;
8use node_runtime::NodeRuntime;
9use postage::stream::Stream;
10use project::Project;
11use rpc::{proto, ErrorCode, TypedEnvelope};
12use settings::Settings;
13use std::{collections::HashMap, sync::Arc};
14use util::{ResultExt, TryFutureExt};
15
16pub struct DevServer {
17 client: Arc<Client>,
18 app_state: AppState,
19 remote_shutdown: bool,
20 projects: HashMap<DevServerProjectId, Model<Project>>,
21 _subscriptions: Vec<client::Subscription>,
22 _maintain_connection: Task<Option<()>>,
23}
24
25pub struct AppState {
26 pub node_runtime: Arc<dyn NodeRuntime>,
27 pub user_store: Model<UserStore>,
28 pub languages: Arc<LanguageRegistry>,
29 pub fs: Arc<dyn Fs>,
30}
31
32struct GlobalDevServer(Model<DevServer>);
33
34impl Global for GlobalDevServer {}
35
36pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
37 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
38 cx.set_global(GlobalDevServer(dev_server.clone()));
39
40 #[cfg(not(target_os = "windows"))]
41 {
42 use signal_hook::consts::{SIGINT, SIGTERM};
43 use signal_hook::iterator::Signals;
44 // Set up a handler when the dev server is shut down
45 // with ctrl-c or kill
46 let (tx, rx) = futures::channel::oneshot::channel();
47 let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
48 std::thread::spawn({
49 move || {
50 if let Some(sig) = signals.forever().next() {
51 tx.send(sig).log_err();
52 }
53 }
54 });
55 cx.spawn(|cx| async move {
56 if let Ok(sig) = rx.await {
57 log::info!("received signal {sig:?}");
58 cx.update(|cx| cx.quit()).log_err();
59 }
60 })
61 .detach();
62 }
63
64 let server_url = ClientSettings::get_global(&cx).server_url.clone();
65 cx.spawn(|cx| async move {
66 client
67 .authenticate_and_connect(false, &cx)
68 .await
69 .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
70 })
71}
72
73impl DevServer {
74 pub fn global(cx: &AppContext) -> Model<DevServer> {
75 cx.global::<GlobalDevServer>().0.clone()
76 }
77
78 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
79 cx.on_app_quit(Self::app_will_quit).detach();
80
81 let maintain_connection = cx.spawn({
82 let client = client.clone();
83 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
84 });
85
86 DevServer {
87 _subscriptions: vec![
88 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
89 client.add_request_handler(
90 cx.weak_model(),
91 Self::handle_validate_dev_server_project_request,
92 ),
93 client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
94 ],
95 _maintain_connection: maintain_connection,
96 projects: Default::default(),
97 remote_shutdown: false,
98 app_state,
99 client,
100 }
101 }
102
103 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
104 let request = if self.remote_shutdown {
105 None
106 } else {
107 Some(
108 self.client
109 .request(proto::ShutdownDevServer { reason: None }),
110 )
111 };
112 async move {
113 if let Some(request) = request {
114 request.await.log_err();
115 }
116 }
117 }
118
119 async fn handle_dev_server_instructions(
120 this: Model<Self>,
121 envelope: TypedEnvelope<proto::DevServerInstructions>,
122 mut cx: AsyncAppContext,
123 ) -> Result<()> {
124 let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
125 let removed_projects = this
126 .projects
127 .keys()
128 .filter(|dev_server_project_id| {
129 !envelope
130 .payload
131 .projects
132 .iter()
133 .any(|p| p.id == dev_server_project_id.0)
134 })
135 .cloned()
136 .collect::<Vec<_>>();
137
138 let added_projects = envelope
139 .payload
140 .projects
141 .into_iter()
142 .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
143 .collect::<Vec<_>>();
144
145 (added_projects, removed_projects)
146 })?;
147
148 for dev_server_project in added_projects {
149 DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
150 }
151
152 this.update(&mut cx, |this, cx| {
153 for old_project_id in &removed_projects_ids {
154 this.unshare_project(old_project_id, cx)?;
155 }
156 Ok::<(), anyhow::Error>(())
157 })??;
158 Ok(())
159 }
160
161 async fn handle_validate_dev_server_project_request(
162 this: Model<Self>,
163 envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
164 cx: AsyncAppContext,
165 ) -> Result<proto::Ack> {
166 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
167 let path = std::path::Path::new(&expanded);
168 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
169
170 let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
171 if !path_exists {
172 return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
173 }
174
175 Ok(proto::Ack {})
176 }
177
178 async fn handle_shutdown(
179 this: Model<Self>,
180 _envelope: TypedEnvelope<proto::ShutdownDevServer>,
181 mut cx: AsyncAppContext,
182 ) -> Result<()> {
183 this.update(&mut cx, |this, cx| {
184 this.remote_shutdown = true;
185 cx.quit();
186 })
187 }
188
189 fn unshare_project(
190 &mut self,
191 dev_server_project_id: &DevServerProjectId,
192 cx: &mut ModelContext<Self>,
193 ) -> Result<()> {
194 if let Some(project) = self.projects.remove(dev_server_project_id) {
195 project.update(cx, |project, cx| project.unshare(cx))?;
196 }
197 Ok(())
198 }
199
200 async fn share_project(
201 this: Model<Self>,
202 dev_server_project: &proto::DevServerProject,
203 cx: &mut AsyncAppContext,
204 ) -> Result<()> {
205 let (client, project) = this.update(cx, |this, cx| {
206 let project = Project::local(
207 this.client.clone(),
208 this.app_state.node_runtime.clone(),
209 this.app_state.user_store.clone(),
210 this.app_state.languages.clone(),
211 this.app_state.fs.clone(),
212 cx,
213 );
214
215 (this.client.clone(), project)
216 })?;
217
218 let path = shellexpand::tilde(&dev_server_project.path).to_string();
219
220 let (worktree, _) = project
221 .update(cx, |project, cx| {
222 project.find_or_create_local_worktree(&path, true, cx)
223 })?
224 .await?;
225
226 worktree.update(cx, |worktree, cx| {
227 worktree.as_local_mut().unwrap().share_private_files(cx)
228 })?;
229
230 let worktrees =
231 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
232
233 let response = client
234 .request(proto::ShareDevServerProject {
235 dev_server_project_id: dev_server_project.id,
236 worktrees,
237 })
238 .await?;
239
240 let project_id = response.project_id;
241 project.update(cx, |project, cx| project.shared(project_id, cx))??;
242 this.update(cx, |this, _| {
243 this.projects
244 .insert(DevServerProjectId(dev_server_project.id), project);
245 })?;
246 Ok(())
247 }
248
249 async fn maintain_connection(
250 this: WeakModel<Self>,
251 client: Arc<Client>,
252 mut cx: AsyncAppContext,
253 ) -> Result<()> {
254 let mut client_status = client.status();
255
256 let _ = client_status.try_recv();
257 let current_status = *client_status.borrow();
258 if current_status.is_connected() {
259 // wait for first disconnect
260 client_status.recv().await;
261 }
262
263 loop {
264 let Some(current_status) = client_status.recv().await else {
265 return Ok(());
266 };
267 let Some(this) = this.upgrade() else {
268 return Ok(());
269 };
270
271 if !current_status.is_connected() {
272 continue;
273 }
274
275 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
276 }
277 }
278
279 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
280 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
281 let request = self.client.request(proto::ReconnectDevServer {
282 reshared_projects: self
283 .projects
284 .iter()
285 .flat_map(|(_, handle)| {
286 let project = handle.read(cx);
287 let project_id = project.remote_id()?;
288 projects.insert(project_id, handle.clone());
289 Some(proto::UpdateProject {
290 project_id,
291 worktrees: project.worktree_metadata_protos(cx),
292 })
293 })
294 .collect(),
295 });
296 cx.spawn(|_, mut cx| async move {
297 let response = request.await?;
298
299 for reshared_project in response.reshared_projects {
300 if let Some(project) = projects.get(&reshared_project.id) {
301 project.update(&mut cx, |project, cx| {
302 project.reshared(reshared_project, cx).log_err();
303 })?;
304 }
305 }
306 Ok(())
307 })
308 }
309}