1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use crate::call::Call;
3use anyhow::{anyhow, Context, Result};
4use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use util::TryFutureExt as _;
11
12#[derive(Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43 pub projects: Vec<ProjectMetadata>,
44}
45
46#[derive(Clone, Debug, PartialEq)]
47pub struct ProjectMetadata {
48 pub id: u64,
49 pub visible_worktree_root_names: Vec<String>,
50 pub guests: BTreeSet<Arc<User>>,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ContactRequestStatus {
55 None,
56 RequestSent,
57 RequestReceived,
58 RequestAccepted,
59}
60
61pub struct UserStore {
62 users: HashMap<u64, Arc<User>>,
63 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
64 current_user: watch::Receiver<Option<Arc<User>>>,
65 contacts: Vec<Arc<Contact>>,
66 incoming_contact_requests: Vec<Arc<User>>,
67 outgoing_contact_requests: Vec<Arc<User>>,
68 pending_contact_requests: HashMap<u64, usize>,
69 invite_info: Option<InviteInfo>,
70 incoming_call: (watch::Sender<Option<Call>>, watch::Receiver<Option<Call>>),
71 client: Weak<Client>,
72 http: Arc<dyn HttpClient>,
73 _maintain_contacts: Task<()>,
74 _maintain_current_user: Task<()>,
75}
76
77#[derive(Clone)]
78pub struct InviteInfo {
79 pub count: u32,
80 pub url: Arc<str>,
81}
82
83pub enum Event {
84 Contact {
85 user: Arc<User>,
86 kind: ContactEventKind,
87 },
88 ShowContacts,
89}
90
91#[derive(Clone, Copy)]
92pub enum ContactEventKind {
93 Requested,
94 Accepted,
95 Cancelled,
96}
97
98impl Entity for UserStore {
99 type Event = Event;
100}
101
102enum UpdateContacts {
103 Update(proto::UpdateContacts),
104 Wait(postage::barrier::Sender),
105 Clear(postage::barrier::Sender),
106}
107
108impl UserStore {
109 pub fn new(
110 client: Arc<Client>,
111 http: Arc<dyn HttpClient>,
112 cx: &mut ModelContext<Self>,
113 ) -> Self {
114 let (mut current_user_tx, current_user_rx) = watch::channel();
115 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
116 let rpc_subscriptions = vec![
117 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
118 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
119 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
120 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
121 client.add_message_handler(cx.handle(), Self::handle_cancel_call),
122 ];
123 Self {
124 users: Default::default(),
125 current_user: current_user_rx,
126 contacts: Default::default(),
127 incoming_contact_requests: Default::default(),
128 outgoing_contact_requests: Default::default(),
129 invite_info: None,
130 incoming_call: watch::channel(),
131 client: Arc::downgrade(&client),
132 update_contacts_tx,
133 http,
134 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
135 let _subscriptions = rpc_subscriptions;
136 while let Some(message) = update_contacts_rx.next().await {
137 if let Some(this) = this.upgrade(&cx) {
138 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
139 .log_err()
140 .await;
141 }
142 }
143 }),
144 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
145 let mut status = client.status();
146 while let Some(status) = status.next().await {
147 match status {
148 Status::Connected { .. } => {
149 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
150 let user = this
151 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
152 .log_err()
153 .await;
154 current_user_tx.send(user).await.ok();
155 }
156 }
157 Status::SignedOut => {
158 current_user_tx.send(None).await.ok();
159 if let Some(this) = this.upgrade(&cx) {
160 this.update(&mut cx, |this, _| this.clear_contacts()).await;
161 }
162 }
163 Status::ConnectionLost => {
164 if let Some(this) = this.upgrade(&cx) {
165 this.update(&mut cx, |this, _| this.clear_contacts()).await;
166 }
167 }
168 _ => {}
169 }
170 }
171 }),
172 pending_contact_requests: Default::default(),
173 }
174 }
175
176 async fn handle_update_invite_info(
177 this: ModelHandle<Self>,
178 message: TypedEnvelope<proto::UpdateInviteInfo>,
179 _: Arc<Client>,
180 mut cx: AsyncAppContext,
181 ) -> Result<()> {
182 this.update(&mut cx, |this, cx| {
183 this.invite_info = Some(InviteInfo {
184 url: Arc::from(message.payload.url),
185 count: message.payload.count,
186 });
187 cx.notify();
188 });
189 Ok(())
190 }
191
192 async fn handle_show_contacts(
193 this: ModelHandle<Self>,
194 _: TypedEnvelope<proto::ShowContacts>,
195 _: Arc<Client>,
196 mut cx: AsyncAppContext,
197 ) -> Result<()> {
198 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
199 Ok(())
200 }
201
202 async fn handle_incoming_call(
203 this: ModelHandle<Self>,
204 envelope: TypedEnvelope<proto::IncomingCall>,
205 _: Arc<Client>,
206 mut cx: AsyncAppContext,
207 ) -> Result<proto::Ack> {
208 let call = Call {
209 room_id: envelope.payload.room_id,
210 participants: this
211 .update(&mut cx, |this, cx| {
212 this.get_users(envelope.payload.participant_user_ids, cx)
213 })
214 .await?,
215 from: this
216 .update(&mut cx, |this, cx| {
217 this.get_user(envelope.payload.from_user_id, cx)
218 })
219 .await?,
220 };
221 this.update(&mut cx, |this, _| {
222 *this.incoming_call.0.borrow_mut() = Some(call);
223 });
224
225 Ok(proto::Ack {})
226 }
227
228 async fn handle_cancel_call(
229 this: ModelHandle<Self>,
230 _: TypedEnvelope<proto::CancelCall>,
231 _: Arc<Client>,
232 mut cx: AsyncAppContext,
233 ) -> Result<()> {
234 this.update(&mut cx, |this, _| {
235 *this.incoming_call.0.borrow_mut() = None;
236 });
237 Ok(())
238 }
239
240 pub fn invite_info(&self) -> Option<&InviteInfo> {
241 self.invite_info.as_ref()
242 }
243
244 pub fn incoming_call(&self) -> watch::Receiver<Option<Call>> {
245 self.incoming_call.1.clone()
246 }
247
248 pub fn decline_call(&mut self) -> Result<()> {
249 let mut incoming_call = self.incoming_call.0.borrow_mut();
250 if incoming_call.is_some() {
251 if let Some(client) = self.client.upgrade() {
252 client.send(proto::DeclineCall {})?;
253 }
254 *incoming_call = None;
255 }
256 Ok(())
257 }
258
259 async fn handle_update_contacts(
260 this: ModelHandle<Self>,
261 message: TypedEnvelope<proto::UpdateContacts>,
262 _: Arc<Client>,
263 mut cx: AsyncAppContext,
264 ) -> Result<()> {
265 this.update(&mut cx, |this, _| {
266 this.update_contacts_tx
267 .unbounded_send(UpdateContacts::Update(message.payload))
268 .unwrap();
269 });
270 Ok(())
271 }
272
273 fn update_contacts(
274 &mut self,
275 message: UpdateContacts,
276 cx: &mut ModelContext<Self>,
277 ) -> Task<Result<()>> {
278 match message {
279 UpdateContacts::Wait(barrier) => {
280 drop(barrier);
281 Task::ready(Ok(()))
282 }
283 UpdateContacts::Clear(barrier) => {
284 self.contacts.clear();
285 self.incoming_contact_requests.clear();
286 self.outgoing_contact_requests.clear();
287 drop(barrier);
288 Task::ready(Ok(()))
289 }
290 UpdateContacts::Update(message) => {
291 let mut user_ids = HashSet::default();
292 for contact in &message.contacts {
293 user_ids.insert(contact.user_id);
294 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
295 }
296 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
297 user_ids.extend(message.outgoing_requests.iter());
298
299 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
300 cx.spawn(|this, mut cx| async move {
301 load_users.await?;
302
303 // Users are fetched in parallel above and cached in call to get_users
304 // No need to paralellize here
305 let mut updated_contacts = Vec::new();
306 for contact in message.contacts {
307 let should_notify = contact.should_notify;
308 updated_contacts.push((
309 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
310 should_notify,
311 ));
312 }
313
314 let mut incoming_requests = Vec::new();
315 for request in message.incoming_requests {
316 incoming_requests.push({
317 let user = this
318 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
319 .await?;
320 (user, request.should_notify)
321 });
322 }
323
324 let mut outgoing_requests = Vec::new();
325 for requested_user_id in message.outgoing_requests {
326 outgoing_requests.push(
327 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
328 .await?,
329 );
330 }
331
332 let removed_contacts =
333 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
334 let removed_incoming_requests =
335 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
336 let removed_outgoing_requests =
337 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
338
339 this.update(&mut cx, |this, cx| {
340 // Remove contacts
341 this.contacts
342 .retain(|contact| !removed_contacts.contains(&contact.user.id));
343 // Update existing contacts and insert new ones
344 for (updated_contact, should_notify) in updated_contacts {
345 if should_notify {
346 cx.emit(Event::Contact {
347 user: updated_contact.user.clone(),
348 kind: ContactEventKind::Accepted,
349 });
350 }
351 match this.contacts.binary_search_by_key(
352 &&updated_contact.user.github_login,
353 |contact| &contact.user.github_login,
354 ) {
355 Ok(ix) => this.contacts[ix] = updated_contact,
356 Err(ix) => this.contacts.insert(ix, updated_contact),
357 }
358 }
359
360 // Remove incoming contact requests
361 this.incoming_contact_requests.retain(|user| {
362 if removed_incoming_requests.contains(&user.id) {
363 cx.emit(Event::Contact {
364 user: user.clone(),
365 kind: ContactEventKind::Cancelled,
366 });
367 false
368 } else {
369 true
370 }
371 });
372 // Update existing incoming requests and insert new ones
373 for (user, should_notify) in incoming_requests {
374 if should_notify {
375 cx.emit(Event::Contact {
376 user: user.clone(),
377 kind: ContactEventKind::Requested,
378 });
379 }
380
381 match this
382 .incoming_contact_requests
383 .binary_search_by_key(&&user.github_login, |contact| {
384 &contact.github_login
385 }) {
386 Ok(ix) => this.incoming_contact_requests[ix] = user,
387 Err(ix) => this.incoming_contact_requests.insert(ix, user),
388 }
389 }
390
391 // Remove outgoing contact requests
392 this.outgoing_contact_requests
393 .retain(|user| !removed_outgoing_requests.contains(&user.id));
394 // Update existing incoming requests and insert new ones
395 for request in outgoing_requests {
396 match this
397 .outgoing_contact_requests
398 .binary_search_by_key(&&request.github_login, |contact| {
399 &contact.github_login
400 }) {
401 Ok(ix) => this.outgoing_contact_requests[ix] = request,
402 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
403 }
404 }
405
406 cx.notify();
407 });
408
409 Ok(())
410 })
411 }
412 }
413 }
414
415 pub fn contacts(&self) -> &[Arc<Contact>] {
416 &self.contacts
417 }
418
419 pub fn has_contact(&self, user: &Arc<User>) -> bool {
420 self.contacts
421 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
422 .is_ok()
423 }
424
425 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
426 &self.incoming_contact_requests
427 }
428
429 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
430 &self.outgoing_contact_requests
431 }
432
433 pub fn is_contact_request_pending(&self, user: &User) -> bool {
434 self.pending_contact_requests.contains_key(&user.id)
435 }
436
437 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
438 if self
439 .contacts
440 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
441 .is_ok()
442 {
443 ContactRequestStatus::RequestAccepted
444 } else if self
445 .outgoing_contact_requests
446 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
447 .is_ok()
448 {
449 ContactRequestStatus::RequestSent
450 } else if self
451 .incoming_contact_requests
452 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
453 .is_ok()
454 {
455 ContactRequestStatus::RequestReceived
456 } else {
457 ContactRequestStatus::None
458 }
459 }
460
461 pub fn request_contact(
462 &mut self,
463 responder_id: u64,
464 cx: &mut ModelContext<Self>,
465 ) -> Task<Result<()>> {
466 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
467 }
468
469 pub fn remove_contact(
470 &mut self,
471 user_id: u64,
472 cx: &mut ModelContext<Self>,
473 ) -> Task<Result<()>> {
474 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
475 }
476
477 pub fn respond_to_contact_request(
478 &mut self,
479 requester_id: u64,
480 accept: bool,
481 cx: &mut ModelContext<Self>,
482 ) -> Task<Result<()>> {
483 self.perform_contact_request(
484 requester_id,
485 proto::RespondToContactRequest {
486 requester_id,
487 response: if accept {
488 proto::ContactRequestResponse::Accept
489 } else {
490 proto::ContactRequestResponse::Decline
491 } as i32,
492 },
493 cx,
494 )
495 }
496
497 pub fn dismiss_contact_request(
498 &mut self,
499 requester_id: u64,
500 cx: &mut ModelContext<Self>,
501 ) -> Task<Result<()>> {
502 let client = self.client.upgrade();
503 cx.spawn_weak(|_, _| async move {
504 client
505 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
506 .request(proto::RespondToContactRequest {
507 requester_id,
508 response: proto::ContactRequestResponse::Dismiss as i32,
509 })
510 .await?;
511 Ok(())
512 })
513 }
514
515 fn perform_contact_request<T: RequestMessage>(
516 &mut self,
517 user_id: u64,
518 request: T,
519 cx: &mut ModelContext<Self>,
520 ) -> Task<Result<()>> {
521 let client = self.client.upgrade();
522 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
523 cx.notify();
524
525 cx.spawn(|this, mut cx| async move {
526 let response = client
527 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
528 .request(request)
529 .await;
530 this.update(&mut cx, |this, cx| {
531 if let Entry::Occupied(mut request_count) =
532 this.pending_contact_requests.entry(user_id)
533 {
534 *request_count.get_mut() -= 1;
535 if *request_count.get() == 0 {
536 request_count.remove();
537 }
538 }
539 cx.notify();
540 });
541 response?;
542 Ok(())
543 })
544 }
545
546 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
547 let (tx, mut rx) = postage::barrier::channel();
548 self.update_contacts_tx
549 .unbounded_send(UpdateContacts::Clear(tx))
550 .unwrap();
551 async move {
552 rx.next().await;
553 }
554 }
555
556 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
557 let (tx, mut rx) = postage::barrier::channel();
558 self.update_contacts_tx
559 .unbounded_send(UpdateContacts::Wait(tx))
560 .unwrap();
561 async move {
562 rx.next().await;
563 }
564 }
565
566 pub fn get_users(
567 &mut self,
568 user_ids: Vec<u64>,
569 cx: &mut ModelContext<Self>,
570 ) -> Task<Result<Vec<Arc<User>>>> {
571 let mut user_ids_to_fetch = user_ids.clone();
572 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
573
574 cx.spawn(|this, mut cx| async move {
575 if !user_ids_to_fetch.is_empty() {
576 this.update(&mut cx, |this, cx| {
577 this.load_users(
578 proto::GetUsers {
579 user_ids: user_ids_to_fetch,
580 },
581 cx,
582 )
583 })
584 .await?;
585 }
586
587 this.read_with(&cx, |this, _| {
588 user_ids
589 .iter()
590 .map(|user_id| {
591 this.users
592 .get(user_id)
593 .cloned()
594 .ok_or_else(|| anyhow!("user {} not found", user_id))
595 })
596 .collect()
597 })
598 })
599 }
600
601 pub fn fuzzy_search_users(
602 &mut self,
603 query: String,
604 cx: &mut ModelContext<Self>,
605 ) -> Task<Result<Vec<Arc<User>>>> {
606 self.load_users(proto::FuzzySearchUsers { query }, cx)
607 }
608
609 pub fn get_user(
610 &mut self,
611 user_id: u64,
612 cx: &mut ModelContext<Self>,
613 ) -> Task<Result<Arc<User>>> {
614 if let Some(user) = self.users.get(&user_id).cloned() {
615 return cx.foreground().spawn(async move { Ok(user) });
616 }
617
618 let load_users = self.get_users(vec![user_id], cx);
619 cx.spawn(|this, mut cx| async move {
620 load_users.await?;
621 this.update(&mut cx, |this, _| {
622 this.users
623 .get(&user_id)
624 .cloned()
625 .ok_or_else(|| anyhow!("server responded with no users"))
626 })
627 })
628 }
629
630 pub fn current_user(&self) -> Option<Arc<User>> {
631 self.current_user.borrow().clone()
632 }
633
634 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
635 self.current_user.clone()
636 }
637
638 fn load_users(
639 &mut self,
640 request: impl RequestMessage<Response = UsersResponse>,
641 cx: &mut ModelContext<Self>,
642 ) -> Task<Result<Vec<Arc<User>>>> {
643 let client = self.client.clone();
644 let http = self.http.clone();
645 cx.spawn_weak(|this, mut cx| async move {
646 if let Some(rpc) = client.upgrade() {
647 let response = rpc.request(request).await.context("error loading users")?;
648 let users = future::join_all(
649 response
650 .users
651 .into_iter()
652 .map(|user| User::new(user, http.as_ref())),
653 )
654 .await;
655
656 if let Some(this) = this.upgrade(&cx) {
657 this.update(&mut cx, |this, _| {
658 for user in &users {
659 this.users.insert(user.id, user.clone());
660 }
661 });
662 }
663 Ok(users)
664 } else {
665 Ok(Vec::new())
666 }
667 })
668 }
669}
670
671impl User {
672 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
673 Arc::new(User {
674 id: message.id,
675 github_login: message.github_login,
676 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
677 })
678 }
679}
680
681impl Contact {
682 async fn from_proto(
683 contact: proto::Contact,
684 user_store: &ModelHandle<UserStore>,
685 cx: &mut AsyncAppContext,
686 ) -> Result<Self> {
687 let user = user_store
688 .update(cx, |user_store, cx| {
689 user_store.get_user(contact.user_id, cx)
690 })
691 .await?;
692 let mut projects = Vec::new();
693 for project in contact.projects {
694 let mut guests = BTreeSet::new();
695 for participant_id in project.guests {
696 guests.insert(
697 user_store
698 .update(cx, |user_store, cx| user_store.get_user(participant_id, cx))
699 .await?,
700 );
701 }
702 projects.push(ProjectMetadata {
703 id: project.id,
704 visible_worktree_root_names: project.visible_worktree_root_names.clone(),
705 guests,
706 });
707 }
708 Ok(Self {
709 user,
710 online: contact.online,
711 projects,
712 })
713 }
714
715 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
716 self.projects
717 .iter()
718 .filter(|project| !project.visible_worktree_root_names.is_empty())
719 }
720}
721
722async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
723 let mut response = http
724 .get(url, Default::default(), true)
725 .await
726 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
727
728 if !response.status().is_success() {
729 return Err(anyhow!("avatar request failed {:?}", response.status()));
730 }
731
732 let mut body = Vec::new();
733 response
734 .body_mut()
735 .read_to_end(&mut body)
736 .await
737 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
738 let format = image::guess_format(&body)?;
739 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
740 Ok(ImageData::new(image))
741}