1use anyhow::{anyhow, Result};
2use client::DevServerProjectId;
3use client::{user::UserStore, Client, ClientSettings};
4use extension::ExtensionStore;
5use fs::Fs;
6use futures::Future;
7use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
8use language::LanguageRegistry;
9use node_runtime::NodeRuntime;
10use postage::stream::Stream;
11use project::Project;
12use rpc::{proto, ErrorCode, TypedEnvelope};
13use settings::{Settings, SettingsStore};
14use std::{collections::HashMap, sync::Arc};
15use util::{ResultExt, TryFutureExt};
16
17pub struct DevServer {
18 client: Arc<Client>,
19 app_state: AppState,
20 remote_shutdown: bool,
21 projects: HashMap<DevServerProjectId, Model<Project>>,
22 _subscriptions: Vec<client::Subscription>,
23 _maintain_connection: Task<Option<()>>,
24}
25
26pub struct AppState {
27 pub node_runtime: Arc<dyn NodeRuntime>,
28 pub user_store: Model<UserStore>,
29 pub languages: Arc<LanguageRegistry>,
30 pub fs: Arc<dyn Fs>,
31}
32
33struct GlobalDevServer(Model<DevServer>);
34
35impl Global for GlobalDevServer {}
36
37pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) -> Task<Result<()>> {
38 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
39 cx.set_global(GlobalDevServer(dev_server.clone()));
40
41 #[cfg(not(target_os = "windows"))]
42 {
43 use signal_hook::consts::{SIGINT, SIGTERM};
44 use signal_hook::iterator::Signals;
45 // Set up a handler when the dev server is shut down
46 // with ctrl-c or kill
47 let (tx, rx) = futures::channel::oneshot::channel();
48 let mut signals = Signals::new(&[SIGTERM, SIGINT]).unwrap();
49 std::thread::spawn({
50 move || {
51 if let Some(sig) = signals.forever().next() {
52 tx.send(sig).log_err();
53 }
54 }
55 });
56 cx.spawn(|cx| async move {
57 if let Ok(sig) = rx.await {
58 log::info!("received signal {sig:?}");
59 cx.update(|cx| cx.quit()).log_err();
60 }
61 })
62 .detach();
63 }
64
65 let server_url = ClientSettings::get_global(&cx).server_url.clone();
66 cx.spawn(|cx| async move {
67 client
68 .authenticate_and_connect(false, &cx)
69 .await
70 .map_err(|e| anyhow!("Error connecting to '{}': {}", server_url, e))
71 })
72}
73
74impl DevServer {
75 pub fn global(cx: &AppContext) -> Model<DevServer> {
76 cx.global::<GlobalDevServer>().0.clone()
77 }
78
79 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
80 cx.on_app_quit(Self::app_will_quit).detach();
81
82 let maintain_connection = cx.spawn({
83 let client = client.clone();
84 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
85 });
86
87 cx.observe_global::<SettingsStore>(|_, cx| {
88 ExtensionStore::global(cx).update(cx, |store, cx| store.auto_install_extensions(cx))
89 })
90 .detach();
91
92 DevServer {
93 _subscriptions: vec![
94 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
95 client.add_request_handler(
96 cx.weak_model(),
97 Self::handle_validate_dev_server_project_request,
98 ),
99 client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
100 ],
101 _maintain_connection: maintain_connection,
102 projects: Default::default(),
103 remote_shutdown: false,
104 app_state,
105 client,
106 }
107 }
108
109 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
110 let request = if self.remote_shutdown {
111 None
112 } else {
113 Some(
114 self.client
115 .request(proto::ShutdownDevServer { reason: None }),
116 )
117 };
118 async move {
119 if let Some(request) = request {
120 request.await.log_err();
121 }
122 }
123 }
124
125 async fn handle_dev_server_instructions(
126 this: Model<Self>,
127 envelope: TypedEnvelope<proto::DevServerInstructions>,
128 mut cx: AsyncAppContext,
129 ) -> Result<()> {
130 let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
131 let removed_projects = this
132 .projects
133 .keys()
134 .filter(|dev_server_project_id| {
135 !envelope
136 .payload
137 .projects
138 .iter()
139 .any(|p| p.id == dev_server_project_id.0)
140 })
141 .cloned()
142 .collect::<Vec<_>>();
143
144 let added_projects = envelope
145 .payload
146 .projects
147 .into_iter()
148 .filter(|project| !this.projects.contains_key(&DevServerProjectId(project.id)))
149 .collect::<Vec<_>>();
150
151 (added_projects, removed_projects)
152 })?;
153
154 for dev_server_project in added_projects {
155 DevServer::share_project(this.clone(), &dev_server_project, &mut cx).await?;
156 }
157
158 this.update(&mut cx, |this, cx| {
159 for old_project_id in &removed_projects_ids {
160 this.unshare_project(old_project_id, cx)?;
161 }
162 Ok::<(), anyhow::Error>(())
163 })??;
164 Ok(())
165 }
166
167 async fn handle_validate_dev_server_project_request(
168 this: Model<Self>,
169 envelope: TypedEnvelope<proto::ValidateDevServerProjectRequest>,
170 cx: AsyncAppContext,
171 ) -> Result<proto::Ack> {
172 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
173 let path = std::path::Path::new(&expanded);
174 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
175
176 let path_exists = fs.metadata(path).await.is_ok_and(|result| result.is_some());
177 if !path_exists {
178 return Err(anyhow!(ErrorCode::DevServerProjectPathDoesNotExist))?;
179 }
180
181 Ok(proto::Ack {})
182 }
183
184 async fn handle_shutdown(
185 this: Model<Self>,
186 _envelope: TypedEnvelope<proto::ShutdownDevServer>,
187 mut cx: AsyncAppContext,
188 ) -> Result<()> {
189 this.update(&mut cx, |this, cx| {
190 this.remote_shutdown = true;
191 cx.quit();
192 })
193 }
194
195 fn unshare_project(
196 &mut self,
197 dev_server_project_id: &DevServerProjectId,
198 cx: &mut ModelContext<Self>,
199 ) -> Result<()> {
200 if let Some(project) = self.projects.remove(dev_server_project_id) {
201 project.update(cx, |project, cx| project.unshare(cx))?;
202 }
203 Ok(())
204 }
205
206 async fn share_project(
207 this: Model<Self>,
208 dev_server_project: &proto::DevServerProject,
209 cx: &mut AsyncAppContext,
210 ) -> Result<()> {
211 let (client, project) = this.update(cx, |this, cx| {
212 let project = Project::local(
213 this.client.clone(),
214 this.app_state.node_runtime.clone(),
215 this.app_state.user_store.clone(),
216 this.app_state.languages.clone(),
217 this.app_state.fs.clone(),
218 cx,
219 );
220
221 (this.client.clone(), project)
222 })?;
223
224 let path = shellexpand::tilde(&dev_server_project.path).to_string();
225
226 let (worktree, _) = project
227 .update(cx, |project, cx| {
228 project.find_or_create_local_worktree(&path, true, cx)
229 })?
230 .await?;
231
232 worktree.update(cx, |worktree, cx| {
233 worktree.as_local_mut().unwrap().share_private_files(cx)
234 })?;
235
236 let worktrees =
237 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
238
239 let response = client
240 .request(proto::ShareDevServerProject {
241 dev_server_project_id: dev_server_project.id,
242 worktrees,
243 })
244 .await?;
245
246 let project_id = response.project_id;
247 project.update(cx, |project, cx| project.shared(project_id, cx))??;
248 this.update(cx, |this, _| {
249 this.projects
250 .insert(DevServerProjectId(dev_server_project.id), project);
251 })?;
252 Ok(())
253 }
254
255 async fn maintain_connection(
256 this: WeakModel<Self>,
257 client: Arc<Client>,
258 mut cx: AsyncAppContext,
259 ) -> Result<()> {
260 let mut client_status = client.status();
261
262 let _ = client_status.try_recv();
263 let current_status = *client_status.borrow();
264 if current_status.is_connected() {
265 // wait for first disconnect
266 client_status.recv().await;
267 }
268
269 loop {
270 let Some(current_status) = client_status.recv().await else {
271 return Ok(());
272 };
273 let Some(this) = this.upgrade() else {
274 return Ok(());
275 };
276
277 if !current_status.is_connected() {
278 continue;
279 }
280
281 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
282 }
283 }
284
285 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
286 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
287 let request = self.client.request(proto::ReconnectDevServer {
288 reshared_projects: self
289 .projects
290 .iter()
291 .flat_map(|(_, handle)| {
292 let project = handle.read(cx);
293 let project_id = project.remote_id()?;
294 projects.insert(project_id, handle.clone());
295 Some(proto::UpdateProject {
296 project_id,
297 worktrees: project.worktree_metadata_protos(cx),
298 })
299 })
300 .collect(),
301 });
302 cx.spawn(|_, mut cx| async move {
303 let response = request.await?;
304
305 for reshared_project in response.reshared_projects {
306 if let Some(project) = projects.get(&reshared_project.id) {
307 project.update(&mut cx, |project, cx| {
308 project.reshared(reshared_project, cx).log_err();
309 })?;
310 }
311 }
312 Ok(())
313 })
314 }
315}