1use crate::call::Call;
2
3use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
4use anyhow::{anyhow, Context, Result};
5use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
6use futures::{channel::mpsc, future, AsyncReadExt, Future, Stream, StreamExt};
7use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
8use postage::{broadcast, sink::Sink, watch};
9use rpc::proto::{RequestMessage, UsersResponse};
10use std::sync::{Arc, Weak};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20impl PartialOrd for User {
21 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
22 Some(self.cmp(other))
23 }
24}
25
26impl Ord for User {
27 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
28 self.github_login.cmp(&other.github_login)
29 }
30}
31
32impl PartialEq for User {
33 fn eq(&self, other: &Self) -> bool {
34 self.id == other.id && self.github_login == other.github_login
35 }
36}
37
38impl Eq for User {}
39
40#[derive(Debug, PartialEq)]
41pub struct Contact {
42 pub user: Arc<User>,
43 pub online: bool,
44 pub projects: Vec<ProjectMetadata>,
45}
46
47#[derive(Clone, Debug, PartialEq)]
48pub struct ProjectMetadata {
49 pub id: u64,
50 pub visible_worktree_root_names: Vec<String>,
51 pub guests: BTreeSet<Arc<User>>,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum ContactRequestStatus {
56 None,
57 RequestSent,
58 RequestReceived,
59 RequestAccepted,
60}
61
62pub struct UserStore {
63 users: HashMap<u64, Arc<User>>,
64 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
65 current_user: watch::Receiver<Option<Arc<User>>>,
66 contacts: Vec<Arc<Contact>>,
67 incoming_contact_requests: Vec<Arc<User>>,
68 outgoing_contact_requests: Vec<Arc<User>>,
69 pending_contact_requests: HashMap<u64, usize>,
70 invite_info: Option<InviteInfo>,
71 incoming_calls: broadcast::Sender<Call>,
72 client: Weak<Client>,
73 http: Arc<dyn HttpClient>,
74 _maintain_contacts: Task<()>,
75 _maintain_current_user: Task<()>,
76}
77
78#[derive(Clone)]
79pub struct InviteInfo {
80 pub count: u32,
81 pub url: Arc<str>,
82}
83
84pub enum Event {
85 Contact {
86 user: Arc<User>,
87 kind: ContactEventKind,
88 },
89 ShowContacts,
90}
91
92#[derive(Clone, Copy)]
93pub enum ContactEventKind {
94 Requested,
95 Accepted,
96 Cancelled,
97}
98
99impl Entity for UserStore {
100 type Event = Event;
101}
102
103enum UpdateContacts {
104 Update(proto::UpdateContacts),
105 Wait(postage::barrier::Sender),
106 Clear(postage::barrier::Sender),
107}
108
109impl UserStore {
110 pub fn new(
111 client: Arc<Client>,
112 http: Arc<dyn HttpClient>,
113 cx: &mut ModelContext<Self>,
114 ) -> Self {
115 let (mut current_user_tx, current_user_rx) = watch::channel();
116 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
117 let rpc_subscriptions = vec![
118 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
119 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
120 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
121 ];
122 let (incoming_calls, _) = broadcast::channel(32);
123 Self {
124 users: Default::default(),
125 current_user: current_user_rx,
126 contacts: Default::default(),
127 incoming_contact_requests: Default::default(),
128 outgoing_contact_requests: Default::default(),
129 invite_info: None,
130 incoming_calls,
131 client: Arc::downgrade(&client),
132 update_contacts_tx,
133 http,
134 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
135 let _subscriptions = rpc_subscriptions;
136 while let Some(message) = update_contacts_rx.next().await {
137 if let Some(this) = this.upgrade(&cx) {
138 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
139 .log_err()
140 .await;
141 }
142 }
143 }),
144 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
145 let mut status = client.status();
146 while let Some(status) = status.next().await {
147 match status {
148 Status::Connected { .. } => {
149 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
150 let user = this
151 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
152 .log_err()
153 .await;
154 current_user_tx.send(user).await.ok();
155 }
156 }
157 Status::SignedOut => {
158 current_user_tx.send(None).await.ok();
159 if let Some(this) = this.upgrade(&cx) {
160 this.update(&mut cx, |this, _| this.clear_contacts()).await;
161 }
162 }
163 Status::ConnectionLost => {
164 if let Some(this) = this.upgrade(&cx) {
165 this.update(&mut cx, |this, _| this.clear_contacts()).await;
166 }
167 }
168 _ => {}
169 }
170 }
171 }),
172 pending_contact_requests: Default::default(),
173 }
174 }
175
176 async fn handle_update_invite_info(
177 this: ModelHandle<Self>,
178 message: TypedEnvelope<proto::UpdateInviteInfo>,
179 _: Arc<Client>,
180 mut cx: AsyncAppContext,
181 ) -> Result<()> {
182 this.update(&mut cx, |this, cx| {
183 this.invite_info = Some(InviteInfo {
184 url: Arc::from(message.payload.url),
185 count: message.payload.count,
186 });
187 cx.notify();
188 });
189 Ok(())
190 }
191
192 async fn handle_show_contacts(
193 this: ModelHandle<Self>,
194 _: TypedEnvelope<proto::ShowContacts>,
195 _: Arc<Client>,
196 mut cx: AsyncAppContext,
197 ) -> Result<()> {
198 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
199 Ok(())
200 }
201
202 pub fn invite_info(&self) -> Option<&InviteInfo> {
203 self.invite_info.as_ref()
204 }
205
206 pub fn incoming_calls(&self) -> impl 'static + Stream<Item = Call> {
207 self.incoming_calls.subscribe()
208 }
209
210 async fn handle_update_contacts(
211 this: ModelHandle<Self>,
212 message: TypedEnvelope<proto::UpdateContacts>,
213 _: Arc<Client>,
214 mut cx: AsyncAppContext,
215 ) -> Result<()> {
216 this.update(&mut cx, |this, _| {
217 this.update_contacts_tx
218 .unbounded_send(UpdateContacts::Update(message.payload))
219 .unwrap();
220 });
221 Ok(())
222 }
223
224 fn update_contacts(
225 &mut self,
226 message: UpdateContacts,
227 cx: &mut ModelContext<Self>,
228 ) -> Task<Result<()>> {
229 match message {
230 UpdateContacts::Wait(barrier) => {
231 drop(barrier);
232 Task::ready(Ok(()))
233 }
234 UpdateContacts::Clear(barrier) => {
235 self.contacts.clear();
236 self.incoming_contact_requests.clear();
237 self.outgoing_contact_requests.clear();
238 drop(barrier);
239 Task::ready(Ok(()))
240 }
241 UpdateContacts::Update(message) => {
242 let mut user_ids = HashSet::default();
243 for contact in &message.contacts {
244 user_ids.insert(contact.user_id);
245 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
246 }
247 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
248 user_ids.extend(message.outgoing_requests.iter());
249
250 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
251 cx.spawn(|this, mut cx| async move {
252 load_users.await?;
253
254 // Users are fetched in parallel above and cached in call to get_users
255 // No need to paralellize here
256 let mut updated_contacts = Vec::new();
257 for contact in message.contacts {
258 let should_notify = contact.should_notify;
259 updated_contacts.push((
260 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
261 should_notify,
262 ));
263 }
264
265 let mut incoming_requests = Vec::new();
266 for request in message.incoming_requests {
267 incoming_requests.push({
268 let user = this
269 .update(&mut cx, |this, cx| {
270 this.fetch_user(request.requester_id, cx)
271 })
272 .await?;
273 (user, request.should_notify)
274 });
275 }
276
277 let mut outgoing_requests = Vec::new();
278 for requested_user_id in message.outgoing_requests {
279 outgoing_requests.push(
280 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
281 .await?,
282 );
283 }
284
285 let removed_contacts =
286 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
287 let removed_incoming_requests =
288 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
289 let removed_outgoing_requests =
290 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
291
292 this.update(&mut cx, |this, cx| {
293 // Remove contacts
294 this.contacts
295 .retain(|contact| !removed_contacts.contains(&contact.user.id));
296 // Update existing contacts and insert new ones
297 for (updated_contact, should_notify) in updated_contacts {
298 if should_notify {
299 cx.emit(Event::Contact {
300 user: updated_contact.user.clone(),
301 kind: ContactEventKind::Accepted,
302 });
303 }
304 match this.contacts.binary_search_by_key(
305 &&updated_contact.user.github_login,
306 |contact| &contact.user.github_login,
307 ) {
308 Ok(ix) => this.contacts[ix] = updated_contact,
309 Err(ix) => this.contacts.insert(ix, updated_contact),
310 }
311 }
312
313 // Remove incoming contact requests
314 this.incoming_contact_requests.retain(|user| {
315 if removed_incoming_requests.contains(&user.id) {
316 cx.emit(Event::Contact {
317 user: user.clone(),
318 kind: ContactEventKind::Cancelled,
319 });
320 false
321 } else {
322 true
323 }
324 });
325 // Update existing incoming requests and insert new ones
326 for (user, should_notify) in incoming_requests {
327 if should_notify {
328 cx.emit(Event::Contact {
329 user: user.clone(),
330 kind: ContactEventKind::Requested,
331 });
332 }
333
334 match this
335 .incoming_contact_requests
336 .binary_search_by_key(&&user.github_login, |contact| {
337 &contact.github_login
338 }) {
339 Ok(ix) => this.incoming_contact_requests[ix] = user,
340 Err(ix) => this.incoming_contact_requests.insert(ix, user),
341 }
342 }
343
344 // Remove outgoing contact requests
345 this.outgoing_contact_requests
346 .retain(|user| !removed_outgoing_requests.contains(&user.id));
347 // Update existing incoming requests and insert new ones
348 for request in outgoing_requests {
349 match this
350 .outgoing_contact_requests
351 .binary_search_by_key(&&request.github_login, |contact| {
352 &contact.github_login
353 }) {
354 Ok(ix) => this.outgoing_contact_requests[ix] = request,
355 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
356 }
357 }
358
359 cx.notify();
360 });
361
362 Ok(())
363 })
364 }
365 }
366 }
367
368 pub fn contacts(&self) -> &[Arc<Contact>] {
369 &self.contacts
370 }
371
372 pub fn has_contact(&self, user: &Arc<User>) -> bool {
373 self.contacts
374 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
375 .is_ok()
376 }
377
378 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
379 &self.incoming_contact_requests
380 }
381
382 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
383 &self.outgoing_contact_requests
384 }
385
386 pub fn is_contact_request_pending(&self, user: &User) -> bool {
387 self.pending_contact_requests.contains_key(&user.id)
388 }
389
390 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
391 if self
392 .contacts
393 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
394 .is_ok()
395 {
396 ContactRequestStatus::RequestAccepted
397 } else if self
398 .outgoing_contact_requests
399 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
400 .is_ok()
401 {
402 ContactRequestStatus::RequestSent
403 } else if self
404 .incoming_contact_requests
405 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
406 .is_ok()
407 {
408 ContactRequestStatus::RequestReceived
409 } else {
410 ContactRequestStatus::None
411 }
412 }
413
414 pub fn request_contact(
415 &mut self,
416 responder_id: u64,
417 cx: &mut ModelContext<Self>,
418 ) -> Task<Result<()>> {
419 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
420 }
421
422 pub fn remove_contact(
423 &mut self,
424 user_id: u64,
425 cx: &mut ModelContext<Self>,
426 ) -> Task<Result<()>> {
427 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
428 }
429
430 pub fn respond_to_contact_request(
431 &mut self,
432 requester_id: u64,
433 accept: bool,
434 cx: &mut ModelContext<Self>,
435 ) -> Task<Result<()>> {
436 self.perform_contact_request(
437 requester_id,
438 proto::RespondToContactRequest {
439 requester_id,
440 response: if accept {
441 proto::ContactRequestResponse::Accept
442 } else {
443 proto::ContactRequestResponse::Decline
444 } as i32,
445 },
446 cx,
447 )
448 }
449
450 pub fn dismiss_contact_request(
451 &mut self,
452 requester_id: u64,
453 cx: &mut ModelContext<Self>,
454 ) -> Task<Result<()>> {
455 let client = self.client.upgrade();
456 cx.spawn_weak(|_, _| async move {
457 client
458 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
459 .request(proto::RespondToContactRequest {
460 requester_id,
461 response: proto::ContactRequestResponse::Dismiss as i32,
462 })
463 .await?;
464 Ok(())
465 })
466 }
467
468 fn perform_contact_request<T: RequestMessage>(
469 &mut self,
470 user_id: u64,
471 request: T,
472 cx: &mut ModelContext<Self>,
473 ) -> Task<Result<()>> {
474 let client = self.client.upgrade();
475 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
476 cx.notify();
477
478 cx.spawn(|this, mut cx| async move {
479 let response = client
480 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
481 .request(request)
482 .await;
483 this.update(&mut cx, |this, cx| {
484 if let Entry::Occupied(mut request_count) =
485 this.pending_contact_requests.entry(user_id)
486 {
487 *request_count.get_mut() -= 1;
488 if *request_count.get() == 0 {
489 request_count.remove();
490 }
491 }
492 cx.notify();
493 });
494 response?;
495 Ok(())
496 })
497 }
498
499 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
500 let (tx, mut rx) = postage::barrier::channel();
501 self.update_contacts_tx
502 .unbounded_send(UpdateContacts::Clear(tx))
503 .unwrap();
504 async move {
505 rx.next().await;
506 }
507 }
508
509 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
510 let (tx, mut rx) = postage::barrier::channel();
511 self.update_contacts_tx
512 .unbounded_send(UpdateContacts::Wait(tx))
513 .unwrap();
514 async move {
515 rx.next().await;
516 }
517 }
518
519 pub fn get_users(
520 &mut self,
521 mut user_ids: Vec<u64>,
522 cx: &mut ModelContext<Self>,
523 ) -> Task<Result<()>> {
524 user_ids.retain(|id| !self.users.contains_key(id));
525 if user_ids.is_empty() {
526 Task::ready(Ok(()))
527 } else {
528 let load = self.load_users(proto::GetUsers { user_ids }, cx);
529 cx.foreground().spawn(async move {
530 load.await?;
531 Ok(())
532 })
533 }
534 }
535
536 pub fn fuzzy_search_users(
537 &mut self,
538 query: String,
539 cx: &mut ModelContext<Self>,
540 ) -> Task<Result<Vec<Arc<User>>>> {
541 self.load_users(proto::FuzzySearchUsers { query }, cx)
542 }
543
544 pub fn fetch_user(
545 &mut self,
546 user_id: u64,
547 cx: &mut ModelContext<Self>,
548 ) -> Task<Result<Arc<User>>> {
549 if let Some(user) = self.users.get(&user_id).cloned() {
550 return cx.foreground().spawn(async move { Ok(user) });
551 }
552
553 let load_users = self.get_users(vec![user_id], cx);
554 cx.spawn(|this, mut cx| async move {
555 load_users.await?;
556 this.update(&mut cx, |this, _| {
557 this.users
558 .get(&user_id)
559 .cloned()
560 .ok_or_else(|| anyhow!("server responded with no users"))
561 })
562 })
563 }
564
565 pub fn current_user(&self) -> Option<Arc<User>> {
566 self.current_user.borrow().clone()
567 }
568
569 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
570 self.current_user.clone()
571 }
572
573 fn load_users(
574 &mut self,
575 request: impl RequestMessage<Response = UsersResponse>,
576 cx: &mut ModelContext<Self>,
577 ) -> Task<Result<Vec<Arc<User>>>> {
578 let client = self.client.clone();
579 let http = self.http.clone();
580 cx.spawn_weak(|this, mut cx| async move {
581 if let Some(rpc) = client.upgrade() {
582 let response = rpc.request(request).await.context("error loading users")?;
583 let users = future::join_all(
584 response
585 .users
586 .into_iter()
587 .map(|user| User::new(user, http.as_ref())),
588 )
589 .await;
590
591 if let Some(this) = this.upgrade(&cx) {
592 this.update(&mut cx, |this, _| {
593 for user in &users {
594 this.users.insert(user.id, user.clone());
595 }
596 });
597 }
598 Ok(users)
599 } else {
600 Ok(Vec::new())
601 }
602 })
603 }
604}
605
606impl User {
607 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
608 Arc::new(User {
609 id: message.id,
610 github_login: message.github_login,
611 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
612 })
613 }
614}
615
616impl Contact {
617 async fn from_proto(
618 contact: proto::Contact,
619 user_store: &ModelHandle<UserStore>,
620 cx: &mut AsyncAppContext,
621 ) -> Result<Self> {
622 let user = user_store
623 .update(cx, |user_store, cx| {
624 user_store.fetch_user(contact.user_id, cx)
625 })
626 .await?;
627 let mut projects = Vec::new();
628 for project in contact.projects {
629 let mut guests = BTreeSet::new();
630 for participant_id in project.guests {
631 guests.insert(
632 user_store
633 .update(cx, |user_store, cx| {
634 user_store.fetch_user(participant_id, cx)
635 })
636 .await?,
637 );
638 }
639 projects.push(ProjectMetadata {
640 id: project.id,
641 visible_worktree_root_names: project.visible_worktree_root_names.clone(),
642 guests,
643 });
644 }
645 Ok(Self {
646 user,
647 online: contact.online,
648 projects,
649 })
650 }
651
652 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
653 self.projects
654 .iter()
655 .filter(|project| !project.visible_worktree_root_names.is_empty())
656 }
657}
658
659async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
660 let mut response = http
661 .get(url, Default::default(), true)
662 .await
663 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
664
665 if !response.status().is_success() {
666 return Err(anyhow!("avatar request failed {:?}", response.status()));
667 }
668
669 let mut body = Vec::new();
670 response
671 .body_mut()
672 .read_to_end(&mut body)
673 .await
674 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
675 let format = image::guess_format(&body)?;
676 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
677 Ok(ImageData::new(image))
678}