1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use futures::{future, AsyncReadExt, Future};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20#[derive(Debug)]
21pub struct Contact {
22 pub user: Arc<User>,
23 pub projects: Vec<ProjectMetadata>,
24}
25
26#[derive(Debug)]
27pub struct ProjectMetadata {
28 pub id: u64,
29 pub is_shared: bool,
30 pub worktree_root_names: Vec<String>,
31 pub guests: Vec<Arc<User>>,
32}
33
34pub struct UserStore {
35 users: HashMap<u64, Arc<User>>,
36 update_contacts_tx: watch::Sender<Option<proto::UpdateContacts>>,
37 current_user: watch::Receiver<Option<Arc<User>>>,
38 contacts: Vec<Arc<Contact>>,
39 incoming_contact_requests: Vec<Arc<User>>,
40 outgoing_contact_requests: Vec<Arc<User>>,
41 client: Weak<Client>,
42 http: Arc<dyn HttpClient>,
43 _maintain_contacts: Task<()>,
44 _maintain_current_user: Task<()>,
45}
46
47pub enum Event {}
48
49impl Entity for UserStore {
50 type Event = Event;
51}
52
53impl UserStore {
54 pub fn new(
55 client: Arc<Client>,
56 http: Arc<dyn HttpClient>,
57 cx: &mut ModelContext<Self>,
58 ) -> Self {
59 let (mut current_user_tx, current_user_rx) = watch::channel();
60 let (update_contacts_tx, mut update_contacts_rx) =
61 watch::channel::<Option<proto::UpdateContacts>>();
62 let rpc_subscription =
63 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
64 Self {
65 users: Default::default(),
66 current_user: current_user_rx,
67 contacts: Default::default(),
68 incoming_contact_requests: Default::default(),
69 outgoing_contact_requests: Default::default(),
70 client: Arc::downgrade(&client),
71 update_contacts_tx,
72 http,
73 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
74 let _subscription = rpc_subscription;
75 while let Some(message) = update_contacts_rx.recv().await {
76 if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
77 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
78 .log_err()
79 .await;
80 }
81 }
82 }),
83 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
84 let mut status = client.status();
85 while let Some(status) = status.recv().await {
86 match status {
87 Status::Connected { .. } => {
88 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
89 let user = this
90 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
91 .log_err()
92 .await;
93 current_user_tx.send(user).await.ok();
94 }
95 }
96 Status::SignedOut => {
97 current_user_tx.send(None).await.ok();
98 }
99 _ => {}
100 }
101 }
102 }),
103 }
104 }
105
106 async fn handle_update_contacts(
107 this: ModelHandle<Self>,
108 msg: TypedEnvelope<proto::UpdateContacts>,
109 _: Arc<Client>,
110 mut cx: AsyncAppContext,
111 ) -> Result<()> {
112 this.update(&mut cx, |this, _| {
113 *this.update_contacts_tx.borrow_mut() = Some(msg.payload);
114 });
115 Ok(())
116 }
117
118 fn update_contacts(
119 &mut self,
120 message: proto::UpdateContacts,
121 cx: &mut ModelContext<Self>,
122 ) -> Task<Result<()>> {
123 log::info!("update contacts on client {:?}", message);
124 let mut user_ids = HashSet::new();
125 for contact in &message.contacts {
126 user_ids.insert(contact.user_id);
127 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
128 }
129 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
130 user_ids.extend(message.outgoing_requests.iter());
131
132 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
133 cx.spawn(|this, mut cx| async move {
134 load_users.await?;
135
136 // Users are fetched in parallel above and cached in call to get_users
137 // No need to paralellize here
138 let mut updated_contacts = Vec::new();
139 for contact in message.contacts {
140 updated_contacts.push(Arc::new(
141 Contact::from_proto(contact, &this, &mut cx).await?,
142 ));
143 }
144
145 let mut incoming_requests = Vec::new();
146 for request in message.incoming_requests {
147 incoming_requests.push(
148 this.update(&mut cx, |this, cx| {
149 this.fetch_user(request.requester_id, cx)
150 })
151 .await?,
152 );
153 }
154
155 let mut outgoing_requests = Vec::new();
156 for requested_user_id in message.outgoing_requests {
157 outgoing_requests.push(
158 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
159 .await?,
160 );
161 }
162
163 let removed_contacts =
164 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
165 let removed_incoming_requests =
166 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
167 let removed_outgoing_requests =
168 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
169
170 this.update(&mut cx, |this, cx| {
171 // Remove contacts
172 this.contacts
173 .retain(|contact| !removed_contacts.contains(&contact.user.id));
174 // Update existing contacts and insert new ones
175 for updated_contact in updated_contacts {
176 match this
177 .contacts
178 .binary_search_by_key(&&updated_contact.user.github_login, |contact| {
179 &contact.user.github_login
180 }) {
181 Ok(ix) => this.contacts[ix] = updated_contact,
182 Err(ix) => this.contacts.insert(ix, updated_contact),
183 }
184 }
185 cx.notify();
186
187 // Remove incoming contact requests
188 this.incoming_contact_requests
189 .retain(|user| !removed_incoming_requests.contains(&user.id));
190 // Update existing incoming requests and insert new ones
191 for request in incoming_requests {
192 match this
193 .incoming_contact_requests
194 .binary_search_by_key(&&request.github_login, |contact| {
195 &contact.github_login
196 }) {
197 Ok(ix) => this.incoming_contact_requests[ix] = request,
198 Err(ix) => this.incoming_contact_requests.insert(ix, request),
199 }
200 }
201
202 // Remove outgoing contact requests
203 this.outgoing_contact_requests
204 .retain(|user| !removed_outgoing_requests.contains(&user.id));
205 // Update existing incoming requests and insert new ones
206 for request in outgoing_requests {
207 match this
208 .outgoing_contact_requests
209 .binary_search_by_key(&&request.github_login, |contact| {
210 &contact.github_login
211 }) {
212 Ok(ix) => this.outgoing_contact_requests[ix] = request,
213 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
214 }
215 }
216 });
217
218 Ok(())
219 })
220 }
221
222 pub fn contacts(&self) -> &[Arc<Contact>] {
223 &self.contacts
224 }
225
226 pub fn has_contact(&self, user: &Arc<User>) -> bool {
227 self.contacts
228 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
229 .is_ok()
230 }
231
232 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
233 &self.incoming_contact_requests
234 }
235
236 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
237 &self.outgoing_contact_requests
238 }
239
240 pub fn request_contact(&self, responder_id: u64) -> impl Future<Output = Result<()>> {
241 let client = self.client.upgrade();
242 async move {
243 client
244 .ok_or_else(|| anyhow!("not logged in"))?
245 .request(proto::RequestContact { responder_id })
246 .await?;
247 Ok(())
248 }
249 }
250
251 pub fn respond_to_contact_request(
252 &self,
253 requester_id: u64,
254 accept: bool,
255 ) -> impl Future<Output = Result<()>> {
256 let client = self.client.upgrade();
257 async move {
258 client
259 .ok_or_else(|| anyhow!("not logged in"))?
260 .request(proto::RespondToContactRequest {
261 requester_id,
262 response: if accept {
263 proto::ContactRequestResponse::Accept
264 } else {
265 proto::ContactRequestResponse::Reject
266 } as i32,
267 })
268 .await?;
269 Ok(())
270 }
271 }
272
273 #[cfg(any(test, feature = "test-support"))]
274 pub fn clear_contacts(&mut self) {
275 self.contacts.clear();
276 self.incoming_contact_requests.clear();
277 self.outgoing_contact_requests.clear();
278 }
279
280 pub fn get_users(
281 &mut self,
282 mut user_ids: Vec<u64>,
283 cx: &mut ModelContext<Self>,
284 ) -> Task<Result<Vec<Arc<User>>>> {
285 user_ids.retain(|id| !self.users.contains_key(id));
286 self.load_users(proto::GetUsers { user_ids }, cx)
287 }
288
289 pub fn fuzzy_search_users(
290 &mut self,
291 query: String,
292 cx: &mut ModelContext<Self>,
293 ) -> Task<Result<Vec<Arc<User>>>> {
294 self.load_users(proto::FuzzySearchUsers { query }, cx)
295 }
296
297 pub fn fetch_user(
298 &mut self,
299 user_id: u64,
300 cx: &mut ModelContext<Self>,
301 ) -> Task<Result<Arc<User>>> {
302 if let Some(user) = self.users.get(&user_id).cloned() {
303 return cx.foreground().spawn(async move { Ok(user) });
304 }
305
306 let load_users = self.get_users(vec![user_id], cx);
307 cx.spawn(|this, mut cx| async move {
308 load_users.await?;
309 this.update(&mut cx, |this, _| {
310 this.users
311 .get(&user_id)
312 .cloned()
313 .ok_or_else(|| anyhow!("server responded with no users"))
314 })
315 })
316 }
317
318 pub fn current_user(&self) -> Option<Arc<User>> {
319 self.current_user.borrow().clone()
320 }
321
322 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
323 self.current_user.clone()
324 }
325
326 fn load_users(
327 &mut self,
328 request: impl RequestMessage<Response = UsersResponse>,
329 cx: &mut ModelContext<Self>,
330 ) -> Task<Result<Vec<Arc<User>>>> {
331 let client = self.client.clone();
332 let http = self.http.clone();
333 cx.spawn_weak(|this, mut cx| async move {
334 if let Some(rpc) = client.upgrade() {
335 let response = rpc.request(request).await.context("error loading users")?;
336 let users = future::join_all(
337 response
338 .users
339 .into_iter()
340 .map(|user| User::new(user, http.as_ref())),
341 )
342 .await;
343
344 if let Some(this) = this.upgrade(&cx) {
345 this.update(&mut cx, |this, _| {
346 for user in &users {
347 this.users.insert(user.id, user.clone());
348 }
349 });
350 }
351 Ok(users)
352 } else {
353 Ok(Vec::new())
354 }
355 })
356 }
357}
358
359impl User {
360 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
361 Arc::new(User {
362 id: message.id,
363 github_login: message.github_login,
364 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
365 })
366 }
367}
368
369impl Contact {
370 async fn from_proto(
371 contact: proto::Contact,
372 user_store: &ModelHandle<UserStore>,
373 cx: &mut AsyncAppContext,
374 ) -> Result<Self> {
375 let user = user_store
376 .update(cx, |user_store, cx| {
377 user_store.fetch_user(contact.user_id, cx)
378 })
379 .await?;
380 let mut projects = Vec::new();
381 for project in contact.projects {
382 let mut guests = Vec::new();
383 for participant_id in project.guests {
384 guests.push(
385 user_store
386 .update(cx, |user_store, cx| {
387 user_store.fetch_user(participant_id, cx)
388 })
389 .await?,
390 );
391 }
392 projects.push(ProjectMetadata {
393 id: project.id,
394 worktree_root_names: project.worktree_root_names.clone(),
395 is_shared: project.is_shared,
396 guests,
397 });
398 }
399 Ok(Self { user, projects })
400 }
401}
402
403async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
404 let mut response = http
405 .get(url, Default::default(), true)
406 .await
407 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
408
409 if !response.status().is_success() {
410 return Err(anyhow!("avatar request failed {:?}", response.status()));
411 }
412
413 let mut body = Vec::new();
414 response
415 .body_mut()
416 .read_to_end(&mut body)
417 .await
418 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
419 let format = image::guess_format(&body)?;
420 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
421 Ok(ImageData::new(image))
422}