1use anyhow::{anyhow, Result};
2use client::DevServerProjectId;
3use client::{user::UserStore, Client, ClientSettings};
4use fs::Fs;
5use futures::Future;
6use gpui::{
7 AppContext, AsyncAppContext, BorrowAppContext, Context, Global, Model, ModelContext, Task,
8 WeakModel,
9};
10use language::LanguageRegistry;
11use node_runtime::NodeRuntime;
12use postage::stream::Stream;
13use project::{Project, WorktreeSettings};
14use rpc::{proto, ErrorCode, TypedEnvelope};
15use settings::{Settings, SettingsStore};
16use std::{collections::HashMap, sync::Arc};
17use util::{ResultExt, TryFutureExt};
18
19pub struct DevServer {
20 client: Arc<Client>,
21 app_state: AppState,
22 remote_shutdown: bool,
23 projects: HashMap<DevServerProjectId, Model<Project>>,
24 _subscriptions: Vec<client::Subscription>,
25 _maintain_connection: Task<Option<()>>,
26}
27
28pub struct AppState {
29 pub node_runtime: Arc<dyn NodeRuntime>,
30 pub user_store: Model<UserStore>,
31 pub languages: Arc<LanguageRegistry>,
32 pub fs: Arc<dyn Fs>,
33}
34
35struct GlobalDevServer(Model<DevServer>);
36
37impl Global for GlobalDevServer {}
38
39pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
40 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
41 cx.set_global(GlobalDevServer(dev_server.clone()));
42
43 // Dev server cannot have any private files for now
44 cx.update_global(|store: &mut SettingsStore, _| {
45 let old_settings = store.get::<WorktreeSettings>(None);
46 store.override_global(WorktreeSettings {
47 private_files: Some(vec![]),
48 ..old_settings.clone()
49 });
50 });
51
52 #[cfg(not(target_os = "windows"))]
53 {
54 use signal_hook::consts::{SIGINT, SIGTERM};
55 use signal_hook::iterator::Signals;
56 // Set up a handler when the dev server is shut down
57 // with ctrl-c or kill
58 let (tx, rx) = futures::channel::oneshot::channel();
59 let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
60 std::thread::spawn({
61 move || {
62 if let Some(sig) = signals.forever().next() {
63 tx.send(sig).log_err();
64 }
65 }
66 });
67 cx.spawn(|cx| async move {
68 if let Ok(sig) = rx.await {
69 log::info!("received signal {sig:?}");
70 cx.update(|cx| cx.quit()).log_err();
71 }
72 })
73 .detach();
74 }
75
76 let server_url = ClientSettings::get_global(&cx).server_url.clone();
77 cx.spawn(|cx| async move {
78 client
79 .authenticate_and_connect(false, &cx)
80 .await
81 .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
82 })
83}
84
85impl DevServer {
86 pub fn global(cx: &AppContext) -> Model<DevServer> {
87 cx.global::<GlobalDevServer>().0.clone()
88 }
89
90 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
91 cx.on_app_quit(Self::app_will_quit).detach();
92
93 let maintain_connection = cx.spawn({
94 let client = client.clone();
95 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
96 });
97
98 DevServer {
99 _subscriptions: vec![
100 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
101 client.add_request_handler(
102 cx.weak_model(),
103 Self::handle_validate_dev_server_project_request,
104 ),
105 client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
106 ],
107 _maintain_connection: maintain_connection,
108 projects: Default::default(),
109 remote_shutdown: false,
110 app_state,
111 client,
112 }
113 }
114
115 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
116 let request = if self.remote_shutdown {
117 None
118 } else {
119 Some(self.client.request(proto::ShutdownDevServer {}))
120 };
121 async move {
122 if let Some(request) = request {
123 request.await.log_err();
124 }
125 }
126 }
127
128 async fn handle_dev_server_instructions(
129 this: Model<Self>,
130 envelope: TypedEnvelope<proto::DevServerInstructions>,
131 _: Arc<Client>,
132 mut cx: AsyncAppContext,
133 ) -> Result<()> {
134 let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
135 let removed_projects = this
136 .projects
137 .keys()
138 .filter(|dev_server_project_id| {
139 !envelope
140 .payload
141 .projects
142 .iter()
143 .any(|p| p.id == dev_server_project_id.0)
144 })
145 .cloned()
146 .collect::<Vec<_>>();
147
148 let added_projects = envelope
149 .payload
150 .projects
151 .into_iter()
152 .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
153 .collect::<Vec<_>>();
154
155 (added_projects, removed_projects)
156 })?;
157
158 for dev_server_project in added_projects {
159 DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
160 }
161
162 this.update(&mut cx, |this, cx| {
163 for old_project_id in &removed_projects_ids {
164 this.unshare_project(old_project_id, cx)?;
165 }
166 Ok::<(), anyhow::Error>(())
167 })??;
168 Ok(())
169 }
170
171 async fn handle_validate_dev_server_project_request(
172 this: Model<Self>,
173 envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
174 _: Arc<Client>,
175 cx: AsyncAppContext,
176 ) -> Result<proto::Ack> {
177 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
178 let path = std::path::Path::new(&expanded);
179 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
180
181 let path_exists = fs.is_dir(path).await;
182 if !path_exists {
183 return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
184 }
185
186 Ok(proto::Ack {})
187 }
188
189 async fn handle_shutdown(
190 this: Model<Self>,
191 _envelope: TypedEnvelope<proto::ShutdownDevServer>,
192 _: Arc<Client>,
193 mut cx: AsyncAppContext,
194 ) -> Result<()> {
195 this.update(&mut cx, |this, cx| {
196 this.remote_shutdown = true;
197 cx.quit();
198 })
199 }
200
201 fn unshare_project(
202 &mut self,
203 dev_server_project_id: &DevServerProjectId,
204 cx: &mut ModelContext<Self>,
205 ) -> Result<()> {
206 if let Some(project) = self.projects.remove(dev_server_project_id) {
207 project.update(cx, |project, cx| project.unshare(cx))?;
208 }
209 Ok(())
210 }
211
212 async fn share_project(
213 this: Model<Self>,
214 dev_server_project: &proto::DevServerProject,
215 cx: &mut AsyncAppContext,
216 ) -> Result<()> {
217 let (client, project) = this.update(cx, |this, cx| {
218 let project = Project::local(
219 this.client.clone(),
220 this.app_state.node_runtime.clone(),
221 this.app_state.user_store.clone(),
222 this.app_state.languages.clone(),
223 this.app_state.fs.clone(),
224 cx,
225 );
226
227 (this.client.clone(), project)
228 })?;
229
230 let path = shellexpand::tilde(&dev_server_project.path).to_string();
231
232 project
233 .update(cx, |project, cx| {
234 project.find_or_create_local_worktree(&path, true, cx)
235 })?
236 .await?;
237
238 let worktrees =
239 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
240
241 let response = client
242 .request(proto::ShareDevServerProject {
243 dev_server_project_id: dev_server_project.id,
244 worktrees,
245 })
246 .await?;
247
248 let project_id = response.project_id;
249 project.update(cx, |project, cx| project.shared(project_id, cx))??;
250 this.update(cx, |this, _| {
251 this.projects
252 .insert(DevServerProjectId(dev_server_project.id), project);
253 })?;
254 Ok(())
255 }
256
257 async fn maintain_connection(
258 this: WeakModel<Self>,
259 client: Arc<Client>,
260 mut cx: AsyncAppContext,
261 ) -> Result<()> {
262 let mut client_status = client.status();
263
264 let _ = client_status.try_recv();
265 let current_status = *client_status.borrow();
266 if current_status.is_connected() {
267 // wait for first disconnect
268 client_status.recv().await;
269 }
270
271 loop {
272 let Some(current_status) = client_status.recv().await else {
273 return Ok(());
274 };
275 let Some(this) = this.upgrade() else {
276 return Ok(());
277 };
278
279 if !current_status.is_connected() {
280 continue;
281 }
282
283 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
284 }
285 }
286
287 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
288 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
289 let request = self.client.request(proto::ReconnectDevServer {
290 reshared_projects: self
291 .projects
292 .iter()
293 .flat_map(|(_, handle)| {
294 let project = handle.read(cx);
295 let project_id = project.remote_id()?;
296 projects.insert(project_id, handle.clone());
297 Some(proto::UpdateProject {
298 project_id,
299 worktrees: project.worktree_metadata_protos(cx),
300 })
301 })
302 .collect(),
303 });
304 cx.spawn(|_, mut cx| async move {
305 let response = request.await?;
306
307 for reshared_project in response.reshared_projects {
308 if let Some(project) = projects.get(&reshared_project.id) {
309 project.update(&mut cx, |project, cx| {
310 project.reshared(reshared_project, cx).log_err();
311 })?;
312 }
313 }
314 Ok(())
315 })
316 }
317}