1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{hash_map::Entry, HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20#[derive(Debug)]
21pub struct Contact {
22 pub user: Arc<User>,
23 pub online: bool,
24 pub projects: Vec<ProjectMetadata>,
25}
26
27#[derive(Debug)]
28pub struct ProjectMetadata {
29 pub id: u64,
30 pub is_shared: bool,
31 pub worktree_root_names: Vec<String>,
32 pub guests: Vec<Arc<User>>,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ContactRequestStatus {
37 None,
38 Pending,
39 RequestSent,
40 RequestReceived,
41 RequestAccepted,
42}
43
44pub struct UserStore {
45 users: HashMap<u64, Arc<User>>,
46 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
47 current_user: watch::Receiver<Option<Arc<User>>>,
48 contacts: Vec<Arc<Contact>>,
49 incoming_contact_requests: Vec<Arc<User>>,
50 outgoing_contact_requests: Vec<Arc<User>>,
51 pending_contact_requests: HashMap<u64, usize>,
52 client: Weak<Client>,
53 http: Arc<dyn HttpClient>,
54 _maintain_contacts: Task<()>,
55 _maintain_current_user: Task<()>,
56}
57
58pub enum Event {}
59
60impl Entity for UserStore {
61 type Event = Event;
62}
63
64enum UpdateContacts {
65 Update(proto::UpdateContacts),
66 Clear(postage::barrier::Sender),
67}
68
69impl UserStore {
70 pub fn new(
71 client: Arc<Client>,
72 http: Arc<dyn HttpClient>,
73 cx: &mut ModelContext<Self>,
74 ) -> Self {
75 let (mut current_user_tx, current_user_rx) = watch::channel();
76 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
77 let rpc_subscription =
78 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
79 Self {
80 users: Default::default(),
81 current_user: current_user_rx,
82 contacts: Default::default(),
83 incoming_contact_requests: Default::default(),
84 outgoing_contact_requests: Default::default(),
85 client: Arc::downgrade(&client),
86 update_contacts_tx,
87 http,
88 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
89 let _subscription = rpc_subscription;
90 while let Some(message) = update_contacts_rx.next().await {
91 if let Some(this) = this.upgrade(&cx) {
92 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
93 .log_err()
94 .await;
95 }
96 }
97 }),
98 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
99 let mut status = client.status();
100 while let Some(status) = status.recv().await {
101 match status {
102 Status::Connected { .. } => {
103 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
104 let user = this
105 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
106 .log_err()
107 .await;
108 current_user_tx.send(user).await.ok();
109 }
110 }
111 Status::SignedOut => {
112 current_user_tx.send(None).await.ok();
113 if let Some(this) = this.upgrade(&cx) {
114 this.update(&mut cx, |this, _| this.clear_contacts()).await;
115 }
116 }
117 Status::ConnectionLost => {
118 if let Some(this) = this.upgrade(&cx) {
119 this.update(&mut cx, |this, _| this.clear_contacts()).await;
120 }
121 }
122 _ => {}
123 }
124 }
125 }),
126 pending_contact_requests: Default::default(),
127 }
128 }
129
130 async fn handle_update_contacts(
131 this: ModelHandle<Self>,
132 msg: TypedEnvelope<proto::UpdateContacts>,
133 _: Arc<Client>,
134 mut cx: AsyncAppContext,
135 ) -> Result<()> {
136 this.update(&mut cx, |this, _| {
137 this.update_contacts_tx
138 .unbounded_send(UpdateContacts::Update(msg.payload))
139 .unwrap();
140 });
141 Ok(())
142 }
143
144 fn update_contacts(
145 &mut self,
146 message: UpdateContacts,
147 cx: &mut ModelContext<Self>,
148 ) -> Task<Result<()>> {
149 match message {
150 UpdateContacts::Clear(barrier) => {
151 self.contacts.clear();
152 self.incoming_contact_requests.clear();
153 self.outgoing_contact_requests.clear();
154 drop(barrier);
155 Task::ready(Ok(()))
156 }
157 UpdateContacts::Update(message) => {
158 log::info!(
159 "update contacts on client {}: {:?}",
160 self.client.upgrade().unwrap().id,
161 message
162 );
163 let mut user_ids = HashSet::new();
164 for contact in &message.contacts {
165 user_ids.insert(contact.user_id);
166 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
167 }
168 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
169 user_ids.extend(message.outgoing_requests.iter());
170
171 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
172 cx.spawn(|this, mut cx| async move {
173 load_users.await?;
174
175 // Users are fetched in parallel above and cached in call to get_users
176 // No need to paralellize here
177 let mut updated_contacts = Vec::new();
178 for contact in message.contacts {
179 updated_contacts.push(Arc::new(
180 Contact::from_proto(contact, &this, &mut cx).await?,
181 ));
182 }
183
184 let mut incoming_requests = Vec::new();
185 for request in message.incoming_requests {
186 incoming_requests.push(
187 this.update(&mut cx, |this, cx| {
188 this.fetch_user(request.requester_id, cx)
189 })
190 .await?,
191 );
192 }
193
194 let mut outgoing_requests = Vec::new();
195 for requested_user_id in message.outgoing_requests {
196 outgoing_requests.push(
197 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
198 .await?,
199 );
200 }
201
202 let removed_contacts =
203 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
204 let removed_incoming_requests =
205 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
206 let removed_outgoing_requests =
207 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
208
209 this.update(&mut cx, |this, cx| {
210 // Remove contacts
211 this.contacts
212 .retain(|contact| !removed_contacts.contains(&contact.user.id));
213 // Update existing contacts and insert new ones
214 for updated_contact in updated_contacts {
215 match this.contacts.binary_search_by_key(
216 &&updated_contact.user.github_login,
217 |contact| &contact.user.github_login,
218 ) {
219 Ok(ix) => this.contacts[ix] = updated_contact,
220 Err(ix) => this.contacts.insert(ix, updated_contact),
221 }
222 }
223
224 // Remove incoming contact requests
225 this.incoming_contact_requests
226 .retain(|user| !removed_incoming_requests.contains(&user.id));
227 // Update existing incoming requests and insert new ones
228 for request in incoming_requests {
229 match this
230 .incoming_contact_requests
231 .binary_search_by_key(&&request.github_login, |contact| {
232 &contact.github_login
233 }) {
234 Ok(ix) => this.incoming_contact_requests[ix] = request,
235 Err(ix) => this.incoming_contact_requests.insert(ix, request),
236 }
237 }
238
239 // Remove outgoing contact requests
240 this.outgoing_contact_requests
241 .retain(|user| !removed_outgoing_requests.contains(&user.id));
242 // Update existing incoming requests and insert new ones
243 for request in outgoing_requests {
244 match this
245 .outgoing_contact_requests
246 .binary_search_by_key(&&request.github_login, |contact| {
247 &contact.github_login
248 }) {
249 Ok(ix) => this.outgoing_contact_requests[ix] = request,
250 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
251 }
252 }
253
254 cx.notify();
255 });
256
257 Ok(())
258 })
259 }
260 }
261 }
262
263 pub fn contacts(&self) -> &[Arc<Contact>] {
264 &self.contacts
265 }
266
267 pub fn has_contact(&self, user: &Arc<User>) -> bool {
268 self.contacts
269 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
270 .is_ok()
271 }
272
273 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
274 &self.incoming_contact_requests
275 }
276
277 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
278 &self.outgoing_contact_requests
279 }
280
281 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
282 if self.pending_contact_requests.contains_key(&user.id) {
283 ContactRequestStatus::Pending
284 } else if self
285 .contacts
286 .binary_search_by_key(&&user.id, |contact| &contact.user.id)
287 .is_ok()
288 {
289 ContactRequestStatus::RequestAccepted
290 } else if self
291 .outgoing_contact_requests
292 .binary_search_by_key(&&user.id, |user| &user.id)
293 .is_ok()
294 {
295 ContactRequestStatus::RequestSent
296 } else if self
297 .incoming_contact_requests
298 .binary_search_by_key(&&user.id, |user| &user.id)
299 .is_ok()
300 {
301 ContactRequestStatus::RequestReceived
302 } else {
303 ContactRequestStatus::None
304 }
305 }
306
307 pub fn request_contact(
308 &mut self,
309 responder_id: u64,
310 cx: &mut ModelContext<Self>,
311 ) -> Task<Result<()>> {
312 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
313 }
314
315 pub fn remove_contact(
316 &mut self,
317 user_id: u64,
318 cx: &mut ModelContext<Self>,
319 ) -> Task<Result<()>> {
320 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
321 }
322
323 pub fn respond_to_contact_request(
324 &mut self,
325 requester_id: u64,
326 accept: bool,
327 cx: &mut ModelContext<Self>,
328 ) -> Task<Result<()>> {
329 self.perform_contact_request(
330 requester_id,
331 proto::RespondToContactRequest {
332 requester_id,
333 response: if accept {
334 proto::ContactRequestResponse::Accept
335 } else {
336 proto::ContactRequestResponse::Reject
337 } as i32,
338 },
339 cx,
340 )
341 }
342
343 fn perform_contact_request<T: RequestMessage>(
344 &mut self,
345 user_id: u64,
346 request: T,
347 cx: &mut ModelContext<Self>,
348 ) -> Task<Result<()>> {
349 let client = self.client.upgrade();
350 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
351 cx.notify();
352
353 cx.spawn(|this, mut cx| async move {
354 let response = client
355 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
356 .request(request)
357 .await;
358 this.update(&mut cx, |this, cx| {
359 if let Entry::Occupied(mut request_count) =
360 this.pending_contact_requests.entry(user_id)
361 {
362 *request_count.get_mut() -= 1;
363 if *request_count.get() == 0 {
364 request_count.remove();
365 }
366 }
367 cx.notify();
368 });
369 response?;
370 Ok(())
371 })
372 }
373
374 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
375 let (tx, mut rx) = postage::barrier::channel();
376 self.update_contacts_tx
377 .unbounded_send(UpdateContacts::Clear(tx))
378 .unwrap();
379 async move {
380 rx.recv().await;
381 }
382 }
383
384 pub fn get_users(
385 &mut self,
386 mut user_ids: Vec<u64>,
387 cx: &mut ModelContext<Self>,
388 ) -> Task<Result<()>> {
389 user_ids.retain(|id| !self.users.contains_key(id));
390 if user_ids.is_empty() {
391 Task::ready(Ok(()))
392 } else {
393 let load = self.load_users(proto::GetUsers { user_ids }, cx);
394 cx.foreground().spawn(async move {
395 load.await?;
396 Ok(())
397 })
398 }
399 }
400
401 pub fn fuzzy_search_users(
402 &mut self,
403 query: String,
404 cx: &mut ModelContext<Self>,
405 ) -> Task<Result<Vec<Arc<User>>>> {
406 self.load_users(proto::FuzzySearchUsers { query }, cx)
407 }
408
409 pub fn fetch_user(
410 &mut self,
411 user_id: u64,
412 cx: &mut ModelContext<Self>,
413 ) -> Task<Result<Arc<User>>> {
414 if let Some(user) = self.users.get(&user_id).cloned() {
415 return cx.foreground().spawn(async move { Ok(user) });
416 }
417
418 let load_users = self.get_users(vec![user_id], cx);
419 cx.spawn(|this, mut cx| async move {
420 load_users.await?;
421 this.update(&mut cx, |this, _| {
422 this.users
423 .get(&user_id)
424 .cloned()
425 .ok_or_else(|| anyhow!("server responded with no users"))
426 })
427 })
428 }
429
430 pub fn current_user(&self) -> Option<Arc<User>> {
431 self.current_user.borrow().clone()
432 }
433
434 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
435 self.current_user.clone()
436 }
437
438 fn load_users(
439 &mut self,
440 request: impl RequestMessage<Response = UsersResponse>,
441 cx: &mut ModelContext<Self>,
442 ) -> Task<Result<Vec<Arc<User>>>> {
443 let client = self.client.clone();
444 let http = self.http.clone();
445 cx.spawn_weak(|this, mut cx| async move {
446 if let Some(rpc) = client.upgrade() {
447 let response = rpc.request(request).await.context("error loading users")?;
448 let users = future::join_all(
449 response
450 .users
451 .into_iter()
452 .map(|user| User::new(user, http.as_ref())),
453 )
454 .await;
455
456 if let Some(this) = this.upgrade(&cx) {
457 this.update(&mut cx, |this, _| {
458 for user in &users {
459 this.users.insert(user.id, user.clone());
460 }
461 });
462 }
463 Ok(users)
464 } else {
465 Ok(Vec::new())
466 }
467 })
468 }
469}
470
471impl User {
472 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
473 Arc::new(User {
474 id: message.id,
475 github_login: message.github_login,
476 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
477 })
478 }
479}
480
481impl Contact {
482 async fn from_proto(
483 contact: proto::Contact,
484 user_store: &ModelHandle<UserStore>,
485 cx: &mut AsyncAppContext,
486 ) -> Result<Self> {
487 let user = user_store
488 .update(cx, |user_store, cx| {
489 user_store.fetch_user(contact.user_id, cx)
490 })
491 .await?;
492 let mut projects = Vec::new();
493 for project in contact.projects {
494 let mut guests = Vec::new();
495 for participant_id in project.guests {
496 guests.push(
497 user_store
498 .update(cx, |user_store, cx| {
499 user_store.fetch_user(participant_id, cx)
500 })
501 .await?,
502 );
503 }
504 projects.push(ProjectMetadata {
505 id: project.id,
506 worktree_root_names: project.worktree_root_names.clone(),
507 is_shared: project.is_shared,
508 guests,
509 });
510 }
511 Ok(Self {
512 user,
513 online: contact.online,
514 projects,
515 })
516 }
517
518 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
519 self.projects
520 .iter()
521 .filter(|project| !project.worktree_root_names.is_empty())
522 }
523}
524
525async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
526 let mut response = http
527 .get(url, Default::default(), true)
528 .await
529 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
530
531 if !response.status().is_success() {
532 return Err(anyhow!("avatar request failed {:?}", response.status()));
533 }
534
535 let mut body = Vec::new();
536 response
537 .body_mut()
538 .read_to_end(&mut body)
539 .await
540 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
541 let format = image::guess_format(&body)?;
542 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
543 Ok(ImageData::new(image))
544}