1use anyhow::Result;
2use client::DevServerProjectId;
3use client::{user::UserStore, Client, ClientSettings};
4use fs::Fs;
5use futures::Future;
6use gpui::{
7 AppContext, AsyncAppContext, BorrowAppContext, Context, Global, Model, ModelContext, Task,
8 WeakModel,
9};
10use language::LanguageRegistry;
11use node_runtime::NodeRuntime;
12use postage::stream::Stream;
13use project::{Project, WorktreeSettings};
14use rpc::{proto, ErrorCode, TypedEnvelope};
15use settings::{Settings, SettingsStore};
16use std::{collections::HashMap, sync::Arc};
17use util::{ResultExt, TryFutureExt};
18
19pub struct DevServer {
20 client: Arc<Client>,
21 app_state: AppState,
22 remote_shutdown: bool,
23 projects: HashMap<DevServerProjectId, Model<Project>>,
24 _subscriptions: Vec<client::Subscription>,
25 _maintain_connection: Task<Option<()>>,
26}
27
28pub struct AppState {
29 pub node_runtime: Arc<dyn NodeRuntime>,
30 pub user_store: Model<UserStore>,
31 pub languages: Arc<LanguageRegistry>,
32 pub fs: Arc<dyn Fs>,
33}
34
35struct GlobalDevServer(Model<DevServer>);
36
37impl Global for GlobalDevServer {}
38
39pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) {
40 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
41 cx.set_global(GlobalDevServer(dev_server.clone()));
42
43 // Dev server cannot have any private files for now
44 cx.update_global(|store: &mut SettingsStore, _| {
45 let old_settings = store.get::<WorktreeSettings>(None);
46 store.override_global(WorktreeSettings {
47 private_files: Some(vec![]),
48 ..old_settings.clone()
49 });
50 });
51
52 // Set up a handler when the dev server is shut down by the user pressing Ctrl-C
53 let (tx, rx) = futures::channel::oneshot::channel();
54 set_ctrlc_handler(move || tx.send(()).log_err().unwrap()).log_err();
55
56 cx.spawn(|cx| async move {
57 rx.await.log_err();
58 log::info!("Received interrupt signal");
59 cx.update(|cx| cx.quit()).log_err();
60 })
61 .detach();
62
63 let server_url = ClientSettings::get_global(&cx).server_url.clone();
64 cx.spawn(|cx| async move {
65 match client.authenticate_and_connect(false, &cx).await {
66 Ok(_) => {
67 log::info!("Connected to {}", server_url);
68 }
69 Err(e) => {
70 log::error!("Error connecting to '{}': {}", server_url, e);
71 cx.update(|cx| cx.quit()).log_err();
72 }
73 }
74 })
75 .detach();
76}
77
78fn set_ctrlc_handler<F>(f: F) -> Result<(), ctrlc::Error>
79where
80 F: FnOnce() + 'static + Send,
81{
82 let f = std::sync::Mutex::new(Some(f));
83 ctrlc::set_handler(move || {
84 if let Ok(mut guard) = f.lock() {
85 let f = guard.take().expect("f can only be taken once");
86 f();
87 }
88 })
89}
90
91impl DevServer {
92 pub fn global(cx: &AppContext) -> Model<DevServer> {
93 cx.global::<GlobalDevServer>().0.clone()
94 }
95
96 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
97 cx.on_app_quit(Self::app_will_quit).detach();
98
99 let maintain_connection = cx.spawn({
100 let client = client.clone();
101 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
102 });
103
104 DevServer {
105 _subscriptions: vec![
106 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
107 client.add_request_handler(
108 cx.weak_model(),
109 Self::handle_validate_dev_server_project_request,
110 ),
111 client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
112 ],
113 _maintain_connection: maintain_connection,
114 projects: Default::default(),
115 remote_shutdown: false,
116 app_state,
117 client,
118 }
119 }
120
121 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
122 let request = if self.remote_shutdown {
123 None
124 } else {
125 Some(self.client.request(proto::ShutdownDevServer {}))
126 };
127 async move {
128 if let Some(request) = request {
129 request.await.log_err();
130 }
131 }
132 }
133
134 async fn handle_dev_server_instructions(
135 this: Model<Self>,
136 envelope: TypedEnvelope<proto::DevServerInstructions>,
137 _: Arc<Client>,
138 mut cx: AsyncAppContext,
139 ) -> Result<()> {
140 let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
141 let removed_projects = this
142 .projects
143 .keys()
144 .filter(|dev_server_project_id| {
145 !envelope
146 .payload
147 .projects
148 .iter()
149 .any(|p| p.id == dev_server_project_id.0)
150 })
151 .cloned()
152 .collect::<Vec<_>>();
153
154 let added_projects = envelope
155 .payload
156 .projects
157 .into_iter()
158 .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
159 .collect::<Vec<_>>();
160
161 (added_projects, removed_projects)
162 })?;
163
164 for dev_server_project in added_projects {
165 DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
166 }
167
168 this.update(&mut cx, |this, cx| {
169 for old_project_id in &removed_projects_ids {
170 this.unshare_project(old_project_id, cx)?;
171 }
172 Ok::<(), anyhow::Error>(())
173 })??;
174 Ok(())
175 }
176
177 async fn handle_validate_dev_server_project_request(
178 this: Model<Self>,
179 envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
180 _: Arc<Client>,
181 cx: AsyncAppContext,
182 ) -> Result<proto::Ack> {
183 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
184 let path = std::path::Path::new(&expanded);
185 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
186
187 let path_exists = fs.is_dir(path).await;
188 if !path_exists {
189 return Err(anyhow::anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
190 }
191
192 Ok(proto::Ack {})
193 }
194
195 async fn handle_shutdown(
196 this: Model<Self>,
197 _envelope: TypedEnvelope<proto::ShutdownDevServer>,
198 _: Arc<Client>,
199 mut cx: AsyncAppContext,
200 ) -> Result<()> {
201 this.update(&mut cx, |this, cx| {
202 this.remote_shutdown = true;
203 cx.quit();
204 })
205 }
206
207 fn unshare_project(
208 &mut self,
209 dev_server_project_id: &DevServerProjectId,
210 cx: &mut ModelContext<Self>,
211 ) -> Result<()> {
212 if let Some(project) = self.projects.remove(dev_server_project_id) {
213 project.update(cx, |project, cx| project.unshare(cx))?;
214 }
215 Ok(())
216 }
217
218 async fn share_project(
219 this: Model<Self>,
220 dev_server_project: &proto::DevServerProject,
221 cx: &mut AsyncAppContext,
222 ) -> Result<()> {
223 let (client, project) = this.update(cx, |this, cx| {
224 let project = Project::local(
225 this.client.clone(),
226 this.app_state.node_runtime.clone(),
227 this.app_state.user_store.clone(),
228 this.app_state.languages.clone(),
229 this.app_state.fs.clone(),
230 cx,
231 );
232
233 (this.client.clone(), project)
234 })?;
235
236 let path = shellexpand::tilde(&dev_server_project.path).to_string();
237
238 project
239 .update(cx, |project, cx| {
240 project.find_or_create_local_worktree(&path, true, cx)
241 })?
242 .await?;
243
244 let worktrees =
245 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
246
247 let response = client
248 .request(proto::ShareDevServerProject {
249 dev_server_project_id: dev_server_project.id,
250 worktrees,
251 })
252 .await?;
253
254 let project_id = response.project_id;
255 project.update(cx, |project, cx| project.shared(project_id, cx))??;
256 this.update(cx, |this, _| {
257 this.projects
258 .insert(DevServerProjectId(dev_server_project.id), project);
259 })?;
260 Ok(())
261 }
262
263 async fn maintain_connection(
264 this: WeakModel<Self>,
265 client: Arc<Client>,
266 mut cx: AsyncAppContext,
267 ) -> Result<()> {
268 let mut client_status = client.status();
269
270 let _ = client_status.try_recv();
271 let current_status = *client_status.borrow();
272 if current_status.is_connected() {
273 // wait for first disconnect
274 client_status.recv().await;
275 }
276
277 loop {
278 let Some(current_status) = client_status.recv().await else {
279 return Ok(());
280 };
281 let Some(this) = this.upgrade() else {
282 return Ok(());
283 };
284
285 if !current_status.is_connected() {
286 continue;
287 }
288
289 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
290 }
291 }
292
293 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
294 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
295 let request = self.client.request(proto::ReconnectDevServer {
296 reshared_projects: self
297 .projects
298 .iter()
299 .flat_map(|(_, handle)| {
300 let project = handle.read(cx);
301 let project_id = project.remote_id()?;
302 projects.insert(project_id, handle.clone());
303 Some(proto::UpdateProject {
304 project_id,
305 worktrees: project.worktree_metadata_protos(cx),
306 })
307 })
308 .collect(),
309 });
310 cx.spawn(|_, mut cx| async move {
311 let response = request.await?;
312
313 for reshared_project in response.reshared_projects {
314 if let Some(project) = projects.get(&reshared_project.id) {
315 project.update(&mut cx, |project, cx| {
316 project.reshared(reshared_project, cx).log_err();
317 })?;
318 }
319 }
320 Ok(())
321 })
322 }
323}