1use anyhow::Result;
2use client::{user::UserStore, Client, ClientSettings, RemoteProjectId};
3use fs::Fs;
4use futures::Future;
5use gpui::{AppContext, AsyncAppContext, Context, Global, Model, ModelContext, Task, WeakModel};
6use language::LanguageRegistry;
7use node_runtime::NodeRuntime;
8use postage::stream::Stream;
9use project::Project;
10use rpc::{proto, TypedEnvelope};
11use settings::Settings;
12use std::{collections::HashMap, sync::Arc};
13use util::{ResultExt, TryFutureExt};
14
15pub struct DevServer {
16 client: Arc<Client>,
17 app_state: AppState,
18 projects: HashMap<RemoteProjectId, Model<Project>>,
19 _subscriptions: Vec<client::Subscription>,
20 _maintain_connection: Task<Option<()>>,
21}
22
23pub struct AppState {
24 pub node_runtime: Arc<dyn NodeRuntime>,
25 pub user_store: Model<UserStore>,
26 pub languages: Arc<LanguageRegistry>,
27 pub fs: Arc<dyn Fs>,
28}
29
30struct GlobalDevServer(Model<DevServer>);
31
32impl Global for GlobalDevServer {}
33
34pub fn init(client: Arc<Client>, app_state: AppState, cx: &mut AppContext) {
35 let dev_server = cx.new_model(|cx| DevServer::new(client.clone(), app_state, cx));
36 cx.set_global(GlobalDevServer(dev_server.clone()));
37
38 // Set up a handler when the dev server is shut down by the user pressing Ctrl-C
39 let (tx, rx) = futures::channel::oneshot::channel();
40 set_ctrlc_handler(move || tx.send(()).log_err().unwrap()).log_err();
41
42 cx.spawn(|cx| async move {
43 rx.await.log_err();
44 log::info!("Received interrupt signal");
45 cx.update(|cx| cx.quit()).log_err();
46 })
47 .detach();
48
49 let server_url = ClientSettings::get_global(&cx).server_url.clone();
50 cx.spawn(|cx| async move {
51 match client.authenticate_and_connect(false, &cx).await {
52 Ok(_) => {
53 log::info!("Connected to {}", server_url);
54 }
55 Err(e) => {
56 log::error!("Error connecting to {}: {}", server_url, e);
57 cx.update(|cx| cx.quit()).log_err();
58 }
59 }
60 })
61 .detach();
62}
63
64fn set_ctrlc_handler<F>(f: F) -> Result<(), ctrlc::Error>
65where
66 F: FnOnce() + 'static + Send,
67{
68 let f = std::sync::Mutex::new(Some(f));
69 ctrlc::set_handler(move || {
70 if let Ok(mut guard) = f.lock() {
71 let f = guard.take().expect("f can only be taken once");
72 f();
73 }
74 })
75}
76
77impl DevServer {
78 pub fn global(cx: &AppContext) -> Model<DevServer> {
79 cx.global::<GlobalDevServer>().0.clone()
80 }
81
82 pub fn new(client: Arc<Client>, app_state: AppState, cx: &mut ModelContext<Self>) -> Self {
83 cx.on_app_quit(Self::app_will_quit).detach();
84
85 let maintain_connection = cx.spawn({
86 let client = client.clone();
87 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
88 });
89
90 DevServer {
91 _subscriptions: vec![
92 client.add_message_handler(cx.weak_model(), Self::handle_dev_server_instructions)
93 ],
94 _maintain_connection: maintain_connection,
95 projects: Default::default(),
96 app_state,
97 client,
98 }
99 }
100
101 fn app_will_quit(&mut self, _: &mut ModelContext<Self>) -> impl Future<Output = ()> {
102 let request = self.client.request(proto::ShutdownDevServer {});
103 async move {
104 request.await.log_err();
105 }
106 }
107
108 async fn handle_dev_server_instructions(
109 this: Model<Self>,
110 envelope: TypedEnvelope<proto::DevServerInstructions>,
111 _: Arc<Client>,
112 mut cx: AsyncAppContext,
113 ) -> Result<()> {
114 let (added_projects, removed_projects_ids) = this.read_with(&mut cx, |this, _| {
115 let removed_projects = this
116 .projects
117 .keys()
118 .filter(|remote_project_id| {
119 !envelope
120 .payload
121 .projects
122 .iter()
123 .any(|p| p.id == remote_project_id.0)
124 })
125 .cloned()
126 .collect::<Vec<_>>();
127
128 let added_projects = envelope
129 .payload
130 .projects
131 .into_iter()
132 .filter(|project| !this.projects.contains_key(&RemoteProjectId(project.id)))
133 .collect::<Vec<_>>();
134
135 (added_projects, removed_projects)
136 })?;
137
138 for remote_project in added_projects {
139 DevServer::share_project(this.clone(), &remote_project, &mut cx).await?;
140 }
141
142 this.update(&mut cx, |this, cx| {
143 for old_project_id in &removed_projects_ids {
144 this.unshare_project(old_project_id, cx)?;
145 }
146 Ok::<(), anyhow::Error>(())
147 })??;
148 Ok(())
149 }
150
151 fn unshare_project(
152 &mut self,
153 remote_project_id: &RemoteProjectId,
154 cx: &mut ModelContext<Self>,
155 ) -> Result<()> {
156 if let Some(project) = self.projects.remove(remote_project_id) {
157 project.update(cx, |project, cx| project.unshare(cx))?;
158 }
159 Ok(())
160 }
161
162 async fn share_project(
163 this: Model<Self>,
164 remote_project: &proto::RemoteProject,
165 cx: &mut AsyncAppContext,
166 ) -> Result<()> {
167 let (client, project) = this.update(cx, |this, cx| {
168 let project = Project::local(
169 this.client.clone(),
170 this.app_state.node_runtime.clone(),
171 this.app_state.user_store.clone(),
172 this.app_state.languages.clone(),
173 this.app_state.fs.clone(),
174 cx,
175 );
176
177 (this.client.clone(), project)
178 })?;
179
180 project
181 .update(cx, |project, cx| {
182 project.find_or_create_local_worktree(&remote_project.path, true, cx)
183 })?
184 .await?;
185
186 let worktrees =
187 project.read_with(cx, |project, cx| project.worktree_metadata_protos(cx))?;
188
189 let response = client
190 .request(proto::ShareRemoteProject {
191 remote_project_id: remote_project.id,
192 worktrees,
193 })
194 .await?;
195
196 let project_id = response.project_id;
197 project.update(cx, |project, cx| project.shared(project_id, cx))??;
198 this.update(cx, |this, _| {
199 this.projects
200 .insert(RemoteProjectId(remote_project.id), project);
201 })?;
202 Ok(())
203 }
204
205 async fn maintain_connection(
206 this: WeakModel<Self>,
207 client: Arc<Client>,
208 mut cx: AsyncAppContext,
209 ) -> Result<()> {
210 let mut client_status = client.status();
211
212 let _ = client_status.try_recv();
213 let current_status = *client_status.borrow();
214 if current_status.is_connected() {
215 // wait for first disconnect
216 client_status.recv().await;
217 }
218
219 loop {
220 let Some(current_status) = client_status.recv().await else {
221 return Ok(());
222 };
223 let Some(this) = this.upgrade() else {
224 return Ok(());
225 };
226
227 if !current_status.is_connected() {
228 continue;
229 }
230
231 this.update(&mut cx, |this, cx| this.rejoin(cx))?.await?;
232 }
233 }
234
235 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
236 let mut projects: HashMap<u64, Model<Project>> = HashMap::default();
237 let request = self.client.request(proto::ReconnectDevServer {
238 reshared_projects: self
239 .projects
240 .iter()
241 .flat_map(|(_, handle)| {
242 let project = handle.read(cx);
243 let project_id = project.remote_id()?;
244 projects.insert(project_id, handle.clone());
245 Some(proto::UpdateProject {
246 project_id,
247 worktrees: project.worktree_metadata_protos(cx),
248 })
249 })
250 .collect(),
251 });
252 cx.spawn(|_, mut cx| async move {
253 let response = request.await?;
254
255 for reshared_project in response.reshared_projects {
256 if let Some(project) = projects.get(&reshared_project.id) {
257 project.update(&mut cx, |project, cx| {
258 project.reshared(reshared_project, cx).log_err();
259 })?;
260 }
261 }
262 Ok(())
263 })
264 }
265}