headless.rs

  1use anyhow::Result;
  2use client::RemoteProjectId;
  3use client::{user::UserStore, Client, ClientSettings};
  4use fs::Fs;
  5use futures::Future;
  6use gpui::{
  7    AppContext, AsyncAppContext, BorrowAppContext, Context, Global, Model, ModelContext, Task,
  8    WeakModel,
  9};
 10use language::LanguageRegistry;
 11use node_runtime::NodeRuntime;
 12use postage::stream::Stream;
 13use project::{Project, WorktreeSettings};
 14use rpc::{proto, ErrorCode, TypedEnvelope};
 15use settings::{Settings, SettingsStore};
 16use std::{collections::HashMap, sync::Arc};
 17use util::{ResultExt, TryFutureExt};
 18
 19pub struct DevServer {
 20    client: Arc<Client>,
 21    app_state: AppState,
 22    remote_shutdown: bool,
 23    projects: HashMap<RemoteProjectId, Model<Project>>,
 24    _subscriptions: Vec<client::Subscription>,
 25    _maintain_connection: Task<Option<()>>,
 26}
 27
 28pub struct AppState {
 29    pub node_runtime: Arc<dyn NodeRuntime>,
 30    pub user_store: Model<UserStore>,
 31    pub languages: Arc<LanguageRegistry>,
 32    pub fs: Arc<dyn Fs>,
 33}
 34
 35struct GlobalDevServer(Model<DevServer>);
 36
 37impl Global for GlobalDevServer {}
 38
 39pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) {
 40    let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
 41    cx.set_global(GlobalDevServer(dev_server.clone()));
 42
 43    // Dev server cannot have any private files for now
 44    cx.update_global(|store: &mut SettingsStore, _| {
 45        let old_settings = store.get::<WorktreeSettings>(None);
 46        store.override_global(WorktreeSettings {
 47            private_files: Some(vec![]),
 48            ..old_settings.clone()
 49        });
 50    });
 51
 52    // Set up a handler when the dev server is shut down by the user pressing Ctrl-C
 53    let (tx, rx) = futures::channel::oneshot::channel();
 54    set_ctrlc_handler(move || tx.send(()).log_err().unwrap()).log_err();
 55
 56    cx.spawn(|cx| async move {
 57        rx.await.log_err();
 58        log::info!("Received interrupt signal");
 59        cx.update(|cx| cx.quit()).log_err();
 60    })
 61    .detach();
 62
 63    let server_url = ClientSettings::get_global(&cx).server_url.clone();
 64    cx.spawn(|cx| async move {
 65        match client.authenticate_and_connect(false, &cx).await {
 66            Ok(_) => {
 67                log::info!("Connected to {}", server_url);
 68            }
 69            Err(e) => {
 70                log::error!("Error connecting to '{}': {}", server_url, e);
 71                cx.update(|cx| cx.quit()).log_err();
 72            }
 73        }
 74    })
 75    .detach();
 76}
 77
 78fn set_ctrlc_handler<F>(f: F) -> Result<(), ctrlc::Error>
 79where
 80    F: FnOnce() + 'static + Send,
 81{
 82    let f = std::sync::Mutex::new(Some(f));
 83    ctrlc::set_handler(move || {
 84        if let Ok(mut guard) = f.lock() {
 85            let f = guard.take().expect("f can only be taken once");
 86            f();
 87        }
 88    })
 89}
 90
 91impl DevServer {
 92    pub fn global(cx: &AppContext) -> Model<DevServer> {
 93        cx.global::<GlobalDevServer>().0.clone()
 94    }
 95
 96    pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
 97        cx.on_app_quit(Self::app_will_quit).detach();
 98
 99        let maintain_connection = cx.spawn({
100            let client = client.clone();
101            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
102        });
103
104        DevServer {
105            _subscriptions: vec![
106                client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions),
107                client.add_request_handler(
108                    cx.weak_model(),
109                    Self::handle_validate_remote_project_request,
110                ),
111                client.add_message_handler(cx.weak_model(), Self::handle_shutdown),
112            ],
113            _maintain_connection: maintain_connection,
114            projects: Default::default(),
115            remote_shutdown: false,
116            app_state,
117            client,
118        }
119    }
120
121    fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
122        let request = if self.remote_shutdown {
123            None
124        } else {
125            Some(self.client.request(proto::ShutdownDevServer {}))
126        };
127        async move {
128            if let Some(request) = request {
129                request.await.log_err();
130            }
131        }
132    }
133
134    async fn handle_dev_server_instructions(
135        this: Model<Self>,
136        envelope: TypedEnvelope<proto::DevServerInstructions>,
137        _: Arc<Client>,
138        mut cx: AsyncAppContext,
139    ) -> Result<()> {
140        let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
141            let removed_projects = this
142                .projects
143                .keys()
144                .filter(|remote_project_id| {
145                    !envelope
146                        .payload
147                        .projects
148                        .iter()
149                        .any(|p| p.id == remote_project_id.0)
150                })
151                .cloned()
152                .collect::<Vec<_>>();
153
154            let added_projects = envelope
155                .payload
156                .projects
157                .into_iter()
158                .filter(|project| !this.projects.contains_key(&RemoteProjectId(project.id)))
159                .collect::<Vec<_>>();
160
161            (added_projects, removed_projects)
162        })?;
163
164        for remote_project in added_projects {
165            DevServer::share_project(this.clone(), &remote_project, &mut cx).await?;
166        }
167
168        this.update(&mut cx, |this, cx| {
169            for old_project_id in &removed_projects_ids {
170                this.unshare_project(old_project_id, cx)?;
171            }
172            Ok::<(), anyhow::Error>(())
173        })??;
174        Ok(())
175    }
176
177    async fn handle_validate_remote_project_request(
178        this: Model<Self>,
179        envelope: TypedEnvelope<proto::ValidateRemoteProjectRequest>,
180        _: Arc<Client>,
181        cx: AsyncAppContext,
182    ) -> Result<proto::Ack> {
183        let path = std::path::Path::new(&envelope.payload.path);
184        let fs = cx.read_model(&this, |this, _| this.app_state.fs.clone())?;
185
186        let path_exists = fs.is_dir(path).await;
187        if !path_exists {
188            return Err(anyhow::anyhow!(ErrorCode::RemoteProjectPathDoesNotExist))?;
189        }
190
191        Ok(proto::Ack {})
192    }
193
194    async fn handle_shutdown(
195        this: Model<Self>,
196        _envelope: TypedEnvelope<proto::ShutdownDevServer>,
197        _: Arc<Client>,
198        mut cx: AsyncAppContext,
199    ) -> Result<()> {
200        this.update(&mut cx, |this, cx| {
201            this.remote_shutdown = true;
202            cx.quit();
203        })
204    }
205
206    fn unshare_project(
207        &mut self,
208        remote_project_id: &RemoteProjectId,
209        cx: &mut ModelContext<Self>,
210    ) -> Result<()> {
211        if let Some(project) = self.projects.remove(remote_project_id) {
212            project.update(cx, |project, cx| project.unshare(cx))?;
213        }
214        Ok(())
215    }
216
217    async fn share_project(
218        this: Model<Self>,
219        remote_project: &proto::RemoteProject,
220        cx: &mut AsyncAppContext,
221    ) -> Result<()> {
222        let (client, project) = this.update(cx, |this, cx| {
223            let project = Project::local(
224                this.client.clone(),
225                this.app_state.node_runtime.clone(),
226                this.app_state.user_store.clone(),
227                this.app_state.languages.clone(),
228                this.app_state.fs.clone(),
229                cx,
230            );
231
232            (this.client.clone(), project)
233        })?;
234
235        project
236            .update(cx, |project, cx| {
237                project.find_or_create_local_worktree(&remote_project.path, true, cx)
238            })?
239            .await?;
240
241        let worktrees =
242            project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
243
244        let response = client
245            .request(proto::ShareRemoteProject {
246                remote_project_id: remote_project.id,
247                worktrees,
248            })
249            .await?;
250
251        let project_id = response.project_id;
252        project.update(cx, |project, cx| project.shared(project_id, cx))??;
253        this.update(cx, |this, _| {
254            this.projects
255                .insert(RemoteProjectId(remote_project.id), project);
256        })?;
257        Ok(())
258    }
259
260    async fn maintain_connection(
261        this: WeakModel<Self>,
262        client: Arc<Client>,
263        mut cx: AsyncAppContext,
264    ) -> Result<()> {
265        let mut client_status = client.status();
266
267        let _ = client_status.try_recv();
268        let current_status = *client_status.borrow();
269        if current_status.is_connected() {
270            // wait for first disconnect
271            client_status.recv().await;
272        }
273
274        loop {
275            let Some(current_status) = client_status.recv().await else {
276                return Ok(());
277            };
278            let Some(this) = this.upgrade() else {
279                return Ok(());
280            };
281
282            if !current_status.is_connected() {
283                continue;
284            }
285
286            this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
287        }
288    }
289
290    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
291        let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
292        let request = self.client.request(proto::ReconnectDevServer {
293            reshared_projects: self
294                .projects
295                .iter()
296                .flat_map(|(_, handle)| {
297                    let project = handle.read(cx);
298                    let project_id = project.remote_id()?;
299                    projects.insert(project_id, handle.clone());
300                    Some(proto::UpdateProject {
301                        project_id,
302                        worktrees: project.worktree_metadata_protos(cx),
303                    })
304                })
305                .collect(),
306        });
307        cx.spawn(|_, mut cx| async move {
308            let response = request.await?;
309
310            for reshared_project in response.reshared_projects {
311                if let Some(project) = projects.get(&reshared_project.id) {
312                    project.update(&mut cx, |project, cx| {
313                        project.reshared(reshared_project, cx).log_err();
314                    })?;
315                }
316            }
317            Ok(())
318        })
319    }
320}