1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use crate::incoming_call::IncomingCall;
3use anyhow::{anyhow, Context, Result};
4use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use util::TryFutureExt as _;
11
12#[derive(Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43 pub projects: Vec<ProjectMetadata>,
44}
45
46#[derive(Clone, Debug, PartialEq)]
47pub struct ProjectMetadata {
48 pub id: u64,
49 pub visible_worktree_root_names: Vec<String>,
50 pub guests: BTreeSet<Arc<User>>,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ContactRequestStatus {
55 None,
56 RequestSent,
57 RequestReceived,
58 RequestAccepted,
59}
60
61pub struct UserStore {
62 users: HashMap<u64, Arc<User>>,
63 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
64 current_user: watch::Receiver<Option<Arc<User>>>,
65 contacts: Vec<Arc<Contact>>,
66 incoming_contact_requests: Vec<Arc<User>>,
67 outgoing_contact_requests: Vec<Arc<User>>,
68 pending_contact_requests: HashMap<u64, usize>,
69 invite_info: Option<InviteInfo>,
70 incoming_call: (
71 watch::Sender<Option<IncomingCall>>,
72 watch::Receiver<Option<IncomingCall>>,
73 ),
74 client: Weak<Client>,
75 http: Arc<dyn HttpClient>,
76 _maintain_contacts: Task<()>,
77 _maintain_current_user: Task<()>,
78}
79
80#[derive(Clone)]
81pub struct InviteInfo {
82 pub count: u32,
83 pub url: Arc<str>,
84}
85
86pub enum Event {
87 Contact {
88 user: Arc<User>,
89 kind: ContactEventKind,
90 },
91 ShowContacts,
92}
93
94#[derive(Clone, Copy)]
95pub enum ContactEventKind {
96 Requested,
97 Accepted,
98 Cancelled,
99}
100
101impl Entity for UserStore {
102 type Event = Event;
103}
104
105enum UpdateContacts {
106 Update(proto::UpdateContacts),
107 Wait(postage::barrier::Sender),
108 Clear(postage::barrier::Sender),
109}
110
111impl UserStore {
112 pub fn new(
113 client: Arc<Client>,
114 http: Arc<dyn HttpClient>,
115 cx: &mut ModelContext<Self>,
116 ) -> Self {
117 let (mut current_user_tx, current_user_rx) = watch::channel();
118 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
119 let rpc_subscriptions = vec![
120 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
121 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
122 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
123 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
124 client.add_message_handler(cx.handle(), Self::handle_cancel_call),
125 ];
126 Self {
127 users: Default::default(),
128 current_user: current_user_rx,
129 contacts: Default::default(),
130 incoming_contact_requests: Default::default(),
131 outgoing_contact_requests: Default::default(),
132 invite_info: None,
133 incoming_call: watch::channel(),
134 client: Arc::downgrade(&client),
135 update_contacts_tx,
136 http,
137 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
138 let _subscriptions = rpc_subscriptions;
139 while let Some(message) = update_contacts_rx.next().await {
140 if let Some(this) = this.upgrade(&cx) {
141 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
142 .log_err()
143 .await;
144 }
145 }
146 }),
147 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
148 let mut status = client.status();
149 while let Some(status) = status.next().await {
150 match status {
151 Status::Connected { .. } => {
152 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
153 let user = this
154 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
155 .log_err()
156 .await;
157 current_user_tx.send(user).await.ok();
158 }
159 }
160 Status::SignedOut => {
161 current_user_tx.send(None).await.ok();
162 if let Some(this) = this.upgrade(&cx) {
163 this.update(&mut cx, |this, _| this.clear_contacts()).await;
164 }
165 }
166 Status::ConnectionLost => {
167 if let Some(this) = this.upgrade(&cx) {
168 this.update(&mut cx, |this, _| this.clear_contacts()).await;
169 }
170 }
171 _ => {}
172 }
173 }
174 }),
175 pending_contact_requests: Default::default(),
176 }
177 }
178
179 async fn handle_update_invite_info(
180 this: ModelHandle<Self>,
181 message: TypedEnvelope<proto::UpdateInviteInfo>,
182 _: Arc<Client>,
183 mut cx: AsyncAppContext,
184 ) -> Result<()> {
185 this.update(&mut cx, |this, cx| {
186 this.invite_info = Some(InviteInfo {
187 url: Arc::from(message.payload.url),
188 count: message.payload.count,
189 });
190 cx.notify();
191 });
192 Ok(())
193 }
194
195 async fn handle_show_contacts(
196 this: ModelHandle<Self>,
197 _: TypedEnvelope<proto::ShowContacts>,
198 _: Arc<Client>,
199 mut cx: AsyncAppContext,
200 ) -> Result<()> {
201 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
202 Ok(())
203 }
204
205 async fn handle_incoming_call(
206 this: ModelHandle<Self>,
207 envelope: TypedEnvelope<proto::IncomingCall>,
208 _: Arc<Client>,
209 mut cx: AsyncAppContext,
210 ) -> Result<proto::Ack> {
211 let call = IncomingCall {
212 room_id: envelope.payload.room_id,
213 participants: this
214 .update(&mut cx, |this, cx| {
215 this.get_users(envelope.payload.participant_user_ids, cx)
216 })
217 .await?,
218 caller: this
219 .update(&mut cx, |this, cx| {
220 this.get_user(envelope.payload.caller_user_id, cx)
221 })
222 .await?,
223 };
224 this.update(&mut cx, |this, _| {
225 *this.incoming_call.0.borrow_mut() = Some(call);
226 });
227
228 Ok(proto::Ack {})
229 }
230
231 async fn handle_cancel_call(
232 this: ModelHandle<Self>,
233 _: TypedEnvelope<proto::CancelCall>,
234 _: Arc<Client>,
235 mut cx: AsyncAppContext,
236 ) -> Result<()> {
237 this.update(&mut cx, |this, _| {
238 *this.incoming_call.0.borrow_mut() = None;
239 });
240 Ok(())
241 }
242
243 pub fn invite_info(&self) -> Option<&InviteInfo> {
244 self.invite_info.as_ref()
245 }
246
247 pub fn incoming_call(&self) -> watch::Receiver<Option<IncomingCall>> {
248 self.incoming_call.1.clone()
249 }
250
251 pub fn decline_call(&mut self) -> Result<()> {
252 if let Some(client) = self.client.upgrade() {
253 client.send(proto::DeclineCall {})?;
254 }
255 Ok(())
256 }
257
258 async fn handle_update_contacts(
259 this: ModelHandle<Self>,
260 message: TypedEnvelope<proto::UpdateContacts>,
261 _: Arc<Client>,
262 mut cx: AsyncAppContext,
263 ) -> Result<()> {
264 this.update(&mut cx, |this, _| {
265 this.update_contacts_tx
266 .unbounded_send(UpdateContacts::Update(message.payload))
267 .unwrap();
268 });
269 Ok(())
270 }
271
272 fn update_contacts(
273 &mut self,
274 message: UpdateContacts,
275 cx: &mut ModelContext<Self>,
276 ) -> Task<Result<()>> {
277 match message {
278 UpdateContacts::Wait(barrier) => {
279 drop(barrier);
280 Task::ready(Ok(()))
281 }
282 UpdateContacts::Clear(barrier) => {
283 self.contacts.clear();
284 self.incoming_contact_requests.clear();
285 self.outgoing_contact_requests.clear();
286 drop(barrier);
287 Task::ready(Ok(()))
288 }
289 UpdateContacts::Update(message) => {
290 let mut user_ids = HashSet::default();
291 for contact in &message.contacts {
292 user_ids.insert(contact.user_id);
293 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
294 }
295 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
296 user_ids.extend(message.outgoing_requests.iter());
297
298 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
299 cx.spawn(|this, mut cx| async move {
300 load_users.await?;
301
302 // Users are fetched in parallel above and cached in call to get_users
303 // No need to paralellize here
304 let mut updated_contacts = Vec::new();
305 for contact in message.contacts {
306 let should_notify = contact.should_notify;
307 updated_contacts.push((
308 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
309 should_notify,
310 ));
311 }
312
313 let mut incoming_requests = Vec::new();
314 for request in message.incoming_requests {
315 incoming_requests.push({
316 let user = this
317 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
318 .await?;
319 (user, request.should_notify)
320 });
321 }
322
323 let mut outgoing_requests = Vec::new();
324 for requested_user_id in message.outgoing_requests {
325 outgoing_requests.push(
326 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
327 .await?,
328 );
329 }
330
331 let removed_contacts =
332 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
333 let removed_incoming_requests =
334 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
335 let removed_outgoing_requests =
336 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
337
338 this.update(&mut cx, |this, cx| {
339 // Remove contacts
340 this.contacts
341 .retain(|contact| !removed_contacts.contains(&contact.user.id));
342 // Update existing contacts and insert new ones
343 for (updated_contact, should_notify) in updated_contacts {
344 if should_notify {
345 cx.emit(Event::Contact {
346 user: updated_contact.user.clone(),
347 kind: ContactEventKind::Accepted,
348 });
349 }
350 match this.contacts.binary_search_by_key(
351 &&updated_contact.user.github_login,
352 |contact| &contact.user.github_login,
353 ) {
354 Ok(ix) => this.contacts[ix] = updated_contact,
355 Err(ix) => this.contacts.insert(ix, updated_contact),
356 }
357 }
358
359 // Remove incoming contact requests
360 this.incoming_contact_requests.retain(|user| {
361 if removed_incoming_requests.contains(&user.id) {
362 cx.emit(Event::Contact {
363 user: user.clone(),
364 kind: ContactEventKind::Cancelled,
365 });
366 false
367 } else {
368 true
369 }
370 });
371 // Update existing incoming requests and insert new ones
372 for (user, should_notify) in incoming_requests {
373 if should_notify {
374 cx.emit(Event::Contact {
375 user: user.clone(),
376 kind: ContactEventKind::Requested,
377 });
378 }
379
380 match this
381 .incoming_contact_requests
382 .binary_search_by_key(&&user.github_login, |contact| {
383 &contact.github_login
384 }) {
385 Ok(ix) => this.incoming_contact_requests[ix] = user,
386 Err(ix) => this.incoming_contact_requests.insert(ix, user),
387 }
388 }
389
390 // Remove outgoing contact requests
391 this.outgoing_contact_requests
392 .retain(|user| !removed_outgoing_requests.contains(&user.id));
393 // Update existing incoming requests and insert new ones
394 for request in outgoing_requests {
395 match this
396 .outgoing_contact_requests
397 .binary_search_by_key(&&request.github_login, |contact| {
398 &contact.github_login
399 }) {
400 Ok(ix) => this.outgoing_contact_requests[ix] = request,
401 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
402 }
403 }
404
405 cx.notify();
406 });
407
408 Ok(())
409 })
410 }
411 }
412 }
413
414 pub fn contacts(&self) -> &[Arc<Contact>] {
415 &self.contacts
416 }
417
418 pub fn has_contact(&self, user: &Arc<User>) -> bool {
419 self.contacts
420 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
421 .is_ok()
422 }
423
424 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
425 &self.incoming_contact_requests
426 }
427
428 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
429 &self.outgoing_contact_requests
430 }
431
432 pub fn is_contact_request_pending(&self, user: &User) -> bool {
433 self.pending_contact_requests.contains_key(&user.id)
434 }
435
436 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
437 if self
438 .contacts
439 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
440 .is_ok()
441 {
442 ContactRequestStatus::RequestAccepted
443 } else if self
444 .outgoing_contact_requests
445 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
446 .is_ok()
447 {
448 ContactRequestStatus::RequestSent
449 } else if self
450 .incoming_contact_requests
451 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
452 .is_ok()
453 {
454 ContactRequestStatus::RequestReceived
455 } else {
456 ContactRequestStatus::None
457 }
458 }
459
460 pub fn request_contact(
461 &mut self,
462 responder_id: u64,
463 cx: &mut ModelContext<Self>,
464 ) -> Task<Result<()>> {
465 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
466 }
467
468 pub fn remove_contact(
469 &mut self,
470 user_id: u64,
471 cx: &mut ModelContext<Self>,
472 ) -> Task<Result<()>> {
473 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
474 }
475
476 pub fn respond_to_contact_request(
477 &mut self,
478 requester_id: u64,
479 accept: bool,
480 cx: &mut ModelContext<Self>,
481 ) -> Task<Result<()>> {
482 self.perform_contact_request(
483 requester_id,
484 proto::RespondToContactRequest {
485 requester_id,
486 response: if accept {
487 proto::ContactRequestResponse::Accept
488 } else {
489 proto::ContactRequestResponse::Decline
490 } as i32,
491 },
492 cx,
493 )
494 }
495
496 pub fn dismiss_contact_request(
497 &mut self,
498 requester_id: u64,
499 cx: &mut ModelContext<Self>,
500 ) -> Task<Result<()>> {
501 let client = self.client.upgrade();
502 cx.spawn_weak(|_, _| async move {
503 client
504 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
505 .request(proto::RespondToContactRequest {
506 requester_id,
507 response: proto::ContactRequestResponse::Dismiss as i32,
508 })
509 .await?;
510 Ok(())
511 })
512 }
513
514 fn perform_contact_request<T: RequestMessage>(
515 &mut self,
516 user_id: u64,
517 request: T,
518 cx: &mut ModelContext<Self>,
519 ) -> Task<Result<()>> {
520 let client = self.client.upgrade();
521 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
522 cx.notify();
523
524 cx.spawn(|this, mut cx| async move {
525 let response = client
526 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
527 .request(request)
528 .await;
529 this.update(&mut cx, |this, cx| {
530 if let Entry::Occupied(mut request_count) =
531 this.pending_contact_requests.entry(user_id)
532 {
533 *request_count.get_mut() -= 1;
534 if *request_count.get() == 0 {
535 request_count.remove();
536 }
537 }
538 cx.notify();
539 });
540 response?;
541 Ok(())
542 })
543 }
544
545 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
546 let (tx, mut rx) = postage::barrier::channel();
547 self.update_contacts_tx
548 .unbounded_send(UpdateContacts::Clear(tx))
549 .unwrap();
550 async move {
551 rx.next().await;
552 }
553 }
554
555 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
556 let (tx, mut rx) = postage::barrier::channel();
557 self.update_contacts_tx
558 .unbounded_send(UpdateContacts::Wait(tx))
559 .unwrap();
560 async move {
561 rx.next().await;
562 }
563 }
564
565 pub fn get_users(
566 &mut self,
567 user_ids: Vec<u64>,
568 cx: &mut ModelContext<Self>,
569 ) -> Task<Result<Vec<Arc<User>>>> {
570 let mut user_ids_to_fetch = user_ids.clone();
571 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
572
573 cx.spawn(|this, mut cx| async move {
574 if !user_ids_to_fetch.is_empty() {
575 this.update(&mut cx, |this, cx| {
576 this.load_users(
577 proto::GetUsers {
578 user_ids: user_ids_to_fetch,
579 },
580 cx,
581 )
582 })
583 .await?;
584 }
585
586 this.read_with(&cx, |this, _| {
587 user_ids
588 .iter()
589 .map(|user_id| {
590 this.users
591 .get(user_id)
592 .cloned()
593 .ok_or_else(|| anyhow!("user {} not found", user_id))
594 })
595 .collect()
596 })
597 })
598 }
599
600 pub fn fuzzy_search_users(
601 &mut self,
602 query: String,
603 cx: &mut ModelContext<Self>,
604 ) -> Task<Result<Vec<Arc<User>>>> {
605 self.load_users(proto::FuzzySearchUsers { query }, cx)
606 }
607
608 pub fn get_user(
609 &mut self,
610 user_id: u64,
611 cx: &mut ModelContext<Self>,
612 ) -> Task<Result<Arc<User>>> {
613 if let Some(user) = self.users.get(&user_id).cloned() {
614 return cx.foreground().spawn(async move { Ok(user) });
615 }
616
617 let load_users = self.get_users(vec![user_id], cx);
618 cx.spawn(|this, mut cx| async move {
619 load_users.await?;
620 this.update(&mut cx, |this, _| {
621 this.users
622 .get(&user_id)
623 .cloned()
624 .ok_or_else(|| anyhow!("server responded with no users"))
625 })
626 })
627 }
628
629 pub fn current_user(&self) -> Option<Arc<User>> {
630 self.current_user.borrow().clone()
631 }
632
633 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
634 self.current_user.clone()
635 }
636
637 fn load_users(
638 &mut self,
639 request: impl RequestMessage<Response = UsersResponse>,
640 cx: &mut ModelContext<Self>,
641 ) -> Task<Result<Vec<Arc<User>>>> {
642 let client = self.client.clone();
643 let http = self.http.clone();
644 cx.spawn_weak(|this, mut cx| async move {
645 if let Some(rpc) = client.upgrade() {
646 let response = rpc.request(request).await.context("error loading users")?;
647 let users = future::join_all(
648 response
649 .users
650 .into_iter()
651 .map(|user| User::new(user, http.as_ref())),
652 )
653 .await;
654
655 if let Some(this) = this.upgrade(&cx) {
656 this.update(&mut cx, |this, _| {
657 for user in &users {
658 this.users.insert(user.id, user.clone());
659 }
660 });
661 }
662 Ok(users)
663 } else {
664 Ok(Vec::new())
665 }
666 })
667 }
668}
669
670impl User {
671 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
672 Arc::new(User {
673 id: message.id,
674 github_login: message.github_login,
675 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
676 })
677 }
678}
679
680impl Contact {
681 async fn from_proto(
682 contact: proto::Contact,
683 user_store: &ModelHandle<UserStore>,
684 cx: &mut AsyncAppContext,
685 ) -> Result<Self> {
686 let user = user_store
687 .update(cx, |user_store, cx| {
688 user_store.get_user(contact.user_id, cx)
689 })
690 .await?;
691 let mut projects = Vec::new();
692 for project in contact.projects {
693 let mut guests = BTreeSet::new();
694 for participant_id in project.guests {
695 guests.insert(
696 user_store
697 .update(cx, |user_store, cx| user_store.get_user(participant_id, cx))
698 .await?,
699 );
700 }
701 projects.push(ProjectMetadata {
702 id: project.id,
703 visible_worktree_root_names: project.visible_worktree_root_names.clone(),
704 guests,
705 });
706 }
707 Ok(Self {
708 user,
709 online: contact.online,
710 projects,
711 })
712 }
713
714 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
715 self.projects
716 .iter()
717 .filter(|project| !project.visible_worktree_root_names.is_empty())
718 }
719}
720
721async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
722 let mut response = http
723 .get(url, Default::default(), true)
724 .await
725 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
726
727 if !response.status().is_success() {
728 return Err(anyhow!("avatar request failed {:?}", response.status()));
729 }
730
731 let mut body = Vec::new();
732 response
733 .body_mut()
734 .read_to_end(&mut body)
735 .await
736 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
737 let format = image::guess_format(&body)?;
738 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
739 Ok(ImageData::new(image))
740}