1use anyhow::Result;
2use client::RemoteProjectId;
3use client::{user::UserStore, Client, ClientSettings};
4use fs::Fs;
5use futures::Future;
6use gpui::{
7 AppContext, AsyncAppContext, BorrowAppContext, Context, Global, Model, ModelContext, Task,
8 WeakModel,
9};
10use language::LanguageRegistry;
11use node_runtime::NodeRuntime;
12use postage::stream::Stream;
13use project::{Project, WorktreeSettings};
14use rpc::{proto, ErrorCode, TypedEnvelope};
15use settings::{Settings, SettingsStore};
16use std::{collections::HashMap, sync::Arc};
17use util::{ResultExt, TryFutureExt};
18
19pub struct DevServer {
20 client: Arc<Client>,
21 app_state: AppState,
22 remote_shutdown: bool,
23 projects: HashMap<RemoteProjectId, Model<Project>>,
24 _subscriptions: Vec<client::Subscription>,
25 _maintain_connection: Task<Option<()>>,
26}
27
28pub struct AppState {
29 pub node_runtime: Arc<dyn NodeRuntime>,
30 pub user_store: Model<UserStore>,
31 pub languages: Arc<LanguageRegistry>,
32 pub fs: Arc<dyn Fs>,
33}
34
35struct GlobalDevServer(Model<DevServer>);
36
37impl Global for GlobalDevServer {}
38
39pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) {
40 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
41 cx.set_global(GlobalDevServer(dev_server.clone()));
42
43 // Dev server cannot have any private files for now
44 cx.update_global(|store: &mut SettingsStore, _| {
45 let old_settings = store.get::<WorktreeSettings>(None);
46 store.override_global(WorktreeSettings {
47 private_files: Some(vec![]),
48 ..old_settings.clone()
49 });
50 });
51
52 // Set up a handler when the dev server is shut down by the user pressing Ctrl-C
53 let (tx, rx) = futures::channel::oneshot::channel();
54 set_ctrlc_handler(move || tx.send(()).log_err().unwrap()).log_err();
55
56 cx.spawn(|cx| async move {
57 rx.await.log_err();
58 log::info!("Received interrupt signal");
59 cx.update(|cx| cx.quit()).log_err();
60 })
61 .detach();
62
63 let server_url = ClientSettings::get_global(&cx).server_url.clone();
64 cx.spawn(|cx| async move {
65 match client.authenticate_and_connect(false, &cx).await {
66 Ok(_) => {
67 log::info!("Connected to {}", server_url);
68 }
69 Err(e) => {
70 log::error!("Error connecting to '{}': {}", server_url, e);
71 cx.update(|cx| cx.quit()).log_err();
72 }
73 }
74 })
75 .detach();
76}
77
78fn set_ctrlc_handler<F>(f: F) -> Result<(), ctrlc::Error>
79where
80 F: FnOnce() + 'static + Send,
81{
82 let f = std::sync::Mutex::new(Some(f));
83 ctrlc::set_handler(move || {
84 if let Ok(mut guard) = f.lock() {
85 let f = guard.take().expect("f can only be taken once");
86 f();
87 }
88 })
89}
90
91impl DevServer {
92 pub fn global(cx: &AppContext) -> Model<DevServer> {
93 cx.global::<GlobalDevServer>().0.clone()
94 }
95
96 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
97 cx.on_app_quit(Self::app_will_quit).detach();
98
99 let maintain_connection = cx.spawn({
100 let client = client.clone();
101 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
102 });
103
104 DevServer {
105 _subscriptions: vec![
106 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
107 client.add_request_handler(
108 cx.weak_model(),
109 Self::handle_validate_remote_project_request,
110 ),
111 client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
112 ],
113 _maintain_connection: maintain_connection,
114 projects: Default::default(),
115 remote_shutdown: false,
116 app_state,
117 client,
118 }
119 }
120
121 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
122 let request = if self.remote_shutdown {
123 None
124 } else {
125 Some(self.client.request(proto::ShutdownDevServer {}))
126 };
127 async move {
128 if let Some(request) = request {
129 request.await.log_err();
130 }
131 }
132 }
133
134 async fn handle_dev_server_instructions(
135 this: Model<Self>,
136 envelope: TypedEnvelope<proto::DevServerInstructions>,
137 _: Arc<Client>,
138 mut cx: AsyncAppContext,
139 ) -> Result<()> {
140 let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
141 let removed_projects = this
142 .projects
143 .keys()
144 .filter(|remote_project_id| {
145 !envelope
146 .payload
147 .projects
148 .iter()
149 .any(|p| p.id == remote_project_id.0)
150 })
151 .cloned()
152 .collect::<Vec<_>>();
153
154 let added_projects = envelope
155 .payload
156 .projects
157 .into_iter()
158 .filter(|project| !this.projects.contains_key(&RemoteProjectId(project.id)))
159 .collect::<Vec<_>>();
160
161 (added_projects, removed_projects)
162 })?;
163
164 for remote_project in added_projects {
165 DevServer::share_project(this.clone(), &remote_project, &mut cx).await?;
166 }
167
168 this.update(&mut cx, |this, cx| {
169 for old_project_id in &removed_projects_ids {
170 this.unshare_project(old_project_id, cx)?;
171 }
172 Ok::<(), anyhow::Error>(())
173 })??;
174 Ok(())
175 }
176
177 async fn handle_validate_remote_project_request(
178 this: Model<Self>,
179 envelope: TypedEnvelope<proto::ValidateRemoteProjectRequest>,
180 _: Arc<Client>,
181 cx: AsyncAppContext,
182 ) -> Result<proto::Ack> {
183 let path = std::path::Path::new(&envelope.payload.path);
184 let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
185
186 let path_exists = fs.is_dir(path).await;
187 if !path_exists {
188 return Err(anyhow::anyhow!(ErrorCode::RemoteProjectPathDoesNotExist))?;
189 }
190
191 Ok(proto::Ack {})
192 }
193
194 async fn handle_shutdown(
195 this: Model<Self>,
196 _envelope: TypedEnvelope<proto::ShutdownDevServer>,
197 _: Arc<Client>,
198 mut cx: AsyncAppContext,
199 ) -> Result<()> {
200 this.update(&mut cx, |this, cx| {
201 this.remote_shutdown = true;
202 cx.quit();
203 })
204 }
205
206 fn unshare_project(
207 &mut self,
208 remote_project_id: &RemoteProjectId,
209 cx: &mut ModelContext<Self>,
210 ) -> Result<()> {
211 if let Some(project) = self.projects.remove(remote_project_id) {
212 project.update(cx, |project, cx| project.unshare(cx))?;
213 }
214 Ok(())
215 }
216
217 async fn share_project(
218 this: Model<Self>,
219 remote_project: &proto::RemoteProject,
220 cx: &mut AsyncAppContext,
221 ) -> Result<()> {
222 let (client, project) = this.update(cx, |this, cx| {
223 let project = Project::local(
224 this.client.clone(),
225 this.app_state.node_runtime.clone(),
226 this.app_state.user_store.clone(),
227 this.app_state.languages.clone(),
228 this.app_state.fs.clone(),
229 cx,
230 );
231
232 (this.client.clone(), project)
233 })?;
234
235 project
236 .update(cx, |project, cx| {
237 project.find_or_create_local_worktree(&remote_project.path, true, cx)
238 })?
239 .await?;
240
241 let worktrees =
242 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
243
244 let response = client
245 .request(proto::ShareRemoteProject {
246 remote_project_id: remote_project.id,
247 worktrees,
248 })
249 .await?;
250
251 let project_id = response.project_id;
252 project.update(cx, |project, cx| project.shared(project_id, cx))??;
253 this.update(cx, |this, _| {
254 this.projects
255 .insert(RemoteProjectId(remote_project.id), project);
256 })?;
257 Ok(())
258 }
259
260 async fn maintain_connection(
261 this: WeakModel<Self>,
262 client: Arc<Client>,
263 mut cx: AsyncAppContext,
264 ) -> Result<()> {
265 let mut client_status = client.status();
266
267 let _ = client_status.try_recv();
268 let current_status = *client_status.borrow();
269 if current_status.is_connected() {
270 // wait for first disconnect
271 client_status.recv().await;
272 }
273
274 loop {
275 let Some(current_status) = client_status.recv().await else {
276 return Ok(());
277 };
278 let Some(this) = this.upgrade() else {
279 return Ok(());
280 };
281
282 if !current_status.is_connected() {
283 continue;
284 }
285
286 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
287 }
288 }
289
290 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
291 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
292 let request = self.client.request(proto::ReconnectDevServer {
293 reshared_projects: self
294 .projects
295 .iter()
296 .flat_map(|(_, handle)| {
297 let project = handle.read(cx);
298 let project_id = project.remote_id()?;
299 projects.insert(project_id, handle.clone());
300 Some(proto::UpdateProject {
301 project_id,
302 worktrees: project.worktree_metadata_protos(cx),
303 })
304 })
305 .collect(),
306 });
307 cx.spawn(|_, mut cx| async move {
308 let response = request.await?;
309
310 for reshared_project in response.reshared_projects {
311 if let Some(project) = projects.get(&reshared_project.id) {
312 project.update(&mut cx, |project, cx| {
313 project.reshared(reshared_project, cx).log_err();
314 })?;
315 }
316 }
317 Ok(())
318 })
319 }
320}