1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use crate::call::Call;
3use anyhow::{anyhow, Context, Result};
4use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use util::TryFutureExt as _;
11
12#[derive(Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43 pub projects: Vec<ProjectMetadata>,
44}
45
46#[derive(Clone, Debug, PartialEq)]
47pub struct ProjectMetadata {
48 pub id: u64,
49 pub visible_worktree_root_names: Vec<String>,
50 pub guests: BTreeSet<Arc<User>>,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ContactRequestStatus {
55 None,
56 RequestSent,
57 RequestReceived,
58 RequestAccepted,
59}
60
61pub struct UserStore {
62 users: HashMap<u64, Arc<User>>,
63 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
64 current_user: watch::Receiver<Option<Arc<User>>>,
65 contacts: Vec<Arc<Contact>>,
66 incoming_contact_requests: Vec<Arc<User>>,
67 outgoing_contact_requests: Vec<Arc<User>>,
68 pending_contact_requests: HashMap<u64, usize>,
69 invite_info: Option<InviteInfo>,
70 incoming_call: (watch::Sender<Option<Call>>, watch::Receiver<Option<Call>>),
71 client: Weak<Client>,
72 http: Arc<dyn HttpClient>,
73 _maintain_contacts: Task<()>,
74 _maintain_current_user: Task<()>,
75}
76
77#[derive(Clone)]
78pub struct InviteInfo {
79 pub count: u32,
80 pub url: Arc<str>,
81}
82
83pub enum Event {
84 Contact {
85 user: Arc<User>,
86 kind: ContactEventKind,
87 },
88 ShowContacts,
89}
90
91#[derive(Clone, Copy)]
92pub enum ContactEventKind {
93 Requested,
94 Accepted,
95 Cancelled,
96}
97
98impl Entity for UserStore {
99 type Event = Event;
100}
101
102enum UpdateContacts {
103 Update(proto::UpdateContacts),
104 Wait(postage::barrier::Sender),
105 Clear(postage::barrier::Sender),
106}
107
108impl UserStore {
109 pub fn new(
110 client: Arc<Client>,
111 http: Arc<dyn HttpClient>,
112 cx: &mut ModelContext<Self>,
113 ) -> Self {
114 let (mut current_user_tx, current_user_rx) = watch::channel();
115 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
116 let rpc_subscriptions = vec![
117 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
118 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
119 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
120 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
121 client.add_message_handler(cx.handle(), Self::handle_cancel_call),
122 ];
123 Self {
124 users: Default::default(),
125 current_user: current_user_rx,
126 contacts: Default::default(),
127 incoming_contact_requests: Default::default(),
128 outgoing_contact_requests: Default::default(),
129 invite_info: None,
130 incoming_call: watch::channel(),
131 client: Arc::downgrade(&client),
132 update_contacts_tx,
133 http,
134 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
135 let _subscriptions = rpc_subscriptions;
136 while let Some(message) = update_contacts_rx.next().await {
137 if let Some(this) = this.upgrade(&cx) {
138 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
139 .log_err()
140 .await;
141 }
142 }
143 }),
144 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
145 let mut status = client.status();
146 while let Some(status) = status.next().await {
147 match status {
148 Status::Connected { .. } => {
149 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
150 let user = this
151 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
152 .log_err()
153 .await;
154 current_user_tx.send(user).await.ok();
155 }
156 }
157 Status::SignedOut => {
158 current_user_tx.send(None).await.ok();
159 if let Some(this) = this.upgrade(&cx) {
160 this.update(&mut cx, |this, _| this.clear_contacts()).await;
161 }
162 }
163 Status::ConnectionLost => {
164 if let Some(this) = this.upgrade(&cx) {
165 this.update(&mut cx, |this, _| this.clear_contacts()).await;
166 }
167 }
168 _ => {}
169 }
170 }
171 }),
172 pending_contact_requests: Default::default(),
173 }
174 }
175
176 async fn handle_update_invite_info(
177 this: ModelHandle<Self>,
178 message: TypedEnvelope<proto::UpdateInviteInfo>,
179 _: Arc<Client>,
180 mut cx: AsyncAppContext,
181 ) -> Result<()> {
182 this.update(&mut cx, |this, cx| {
183 this.invite_info = Some(InviteInfo {
184 url: Arc::from(message.payload.url),
185 count: message.payload.count,
186 });
187 cx.notify();
188 });
189 Ok(())
190 }
191
192 async fn handle_show_contacts(
193 this: ModelHandle<Self>,
194 _: TypedEnvelope<proto::ShowContacts>,
195 _: Arc<Client>,
196 mut cx: AsyncAppContext,
197 ) -> Result<()> {
198 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
199 Ok(())
200 }
201
202 async fn handle_incoming_call(
203 this: ModelHandle<Self>,
204 envelope: TypedEnvelope<proto::IncomingCall>,
205 _: Arc<Client>,
206 mut cx: AsyncAppContext,
207 ) -> Result<proto::Ack> {
208 let call = Call {
209 room_id: envelope.payload.room_id,
210 participants: this
211 .update(&mut cx, |this, cx| {
212 this.get_users(envelope.payload.participant_user_ids, cx)
213 })
214 .await?,
215 from: this
216 .update(&mut cx, |this, cx| {
217 this.get_user(envelope.payload.caller_user_id, cx)
218 })
219 .await?,
220 };
221 this.update(&mut cx, |this, _| {
222 *this.incoming_call.0.borrow_mut() = Some(call);
223 });
224
225 Ok(proto::Ack {})
226 }
227
228 async fn handle_cancel_call(
229 this: ModelHandle<Self>,
230 _: TypedEnvelope<proto::CancelCall>,
231 _: Arc<Client>,
232 mut cx: AsyncAppContext,
233 ) -> Result<()> {
234 this.update(&mut cx, |this, _| {
235 *this.incoming_call.0.borrow_mut() = None;
236 });
237 Ok(())
238 }
239
240 pub fn invite_info(&self) -> Option<&InviteInfo> {
241 self.invite_info.as_ref()
242 }
243
244 pub fn incoming_call(&self) -> watch::Receiver<Option<Call>> {
245 self.incoming_call.1.clone()
246 }
247
248 pub fn decline_call(&mut self) -> Result<()> {
249 if let Some(client) = self.client.upgrade() {
250 client.send(proto::DeclineCall {})?;
251 }
252 Ok(())
253 }
254
255 async fn handle_update_contacts(
256 this: ModelHandle<Self>,
257 message: TypedEnvelope<proto::UpdateContacts>,
258 _: Arc<Client>,
259 mut cx: AsyncAppContext,
260 ) -> Result<()> {
261 this.update(&mut cx, |this, _| {
262 this.update_contacts_tx
263 .unbounded_send(UpdateContacts::Update(message.payload))
264 .unwrap();
265 });
266 Ok(())
267 }
268
269 fn update_contacts(
270 &mut self,
271 message: UpdateContacts,
272 cx: &mut ModelContext<Self>,
273 ) -> Task<Result<()>> {
274 match message {
275 UpdateContacts::Wait(barrier) => {
276 drop(barrier);
277 Task::ready(Ok(()))
278 }
279 UpdateContacts::Clear(barrier) => {
280 self.contacts.clear();
281 self.incoming_contact_requests.clear();
282 self.outgoing_contact_requests.clear();
283 drop(barrier);
284 Task::ready(Ok(()))
285 }
286 UpdateContacts::Update(message) => {
287 let mut user_ids = HashSet::default();
288 for contact in &message.contacts {
289 user_ids.insert(contact.user_id);
290 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
291 }
292 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
293 user_ids.extend(message.outgoing_requests.iter());
294
295 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
296 cx.spawn(|this, mut cx| async move {
297 load_users.await?;
298
299 // Users are fetched in parallel above and cached in call to get_users
300 // No need to paralellize here
301 let mut updated_contacts = Vec::new();
302 for contact in message.contacts {
303 let should_notify = contact.should_notify;
304 updated_contacts.push((
305 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
306 should_notify,
307 ));
308 }
309
310 let mut incoming_requests = Vec::new();
311 for request in message.incoming_requests {
312 incoming_requests.push({
313 let user = this
314 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
315 .await?;
316 (user, request.should_notify)
317 });
318 }
319
320 let mut outgoing_requests = Vec::new();
321 for requested_user_id in message.outgoing_requests {
322 outgoing_requests.push(
323 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
324 .await?,
325 );
326 }
327
328 let removed_contacts =
329 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
330 let removed_incoming_requests =
331 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
332 let removed_outgoing_requests =
333 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
334
335 this.update(&mut cx, |this, cx| {
336 // Remove contacts
337 this.contacts
338 .retain(|contact| !removed_contacts.contains(&contact.user.id));
339 // Update existing contacts and insert new ones
340 for (updated_contact, should_notify) in updated_contacts {
341 if should_notify {
342 cx.emit(Event::Contact {
343 user: updated_contact.user.clone(),
344 kind: ContactEventKind::Accepted,
345 });
346 }
347 match this.contacts.binary_search_by_key(
348 &&updated_contact.user.github_login,
349 |contact| &contact.user.github_login,
350 ) {
351 Ok(ix) => this.contacts[ix] = updated_contact,
352 Err(ix) => this.contacts.insert(ix, updated_contact),
353 }
354 }
355
356 // Remove incoming contact requests
357 this.incoming_contact_requests.retain(|user| {
358 if removed_incoming_requests.contains(&user.id) {
359 cx.emit(Event::Contact {
360 user: user.clone(),
361 kind: ContactEventKind::Cancelled,
362 });
363 false
364 } else {
365 true
366 }
367 });
368 // Update existing incoming requests and insert new ones
369 for (user, should_notify) in incoming_requests {
370 if should_notify {
371 cx.emit(Event::Contact {
372 user: user.clone(),
373 kind: ContactEventKind::Requested,
374 });
375 }
376
377 match this
378 .incoming_contact_requests
379 .binary_search_by_key(&&user.github_login, |contact| {
380 &contact.github_login
381 }) {
382 Ok(ix) => this.incoming_contact_requests[ix] = user,
383 Err(ix) => this.incoming_contact_requests.insert(ix, user),
384 }
385 }
386
387 // Remove outgoing contact requests
388 this.outgoing_contact_requests
389 .retain(|user| !removed_outgoing_requests.contains(&user.id));
390 // Update existing incoming requests and insert new ones
391 for request in outgoing_requests {
392 match this
393 .outgoing_contact_requests
394 .binary_search_by_key(&&request.github_login, |contact| {
395 &contact.github_login
396 }) {
397 Ok(ix) => this.outgoing_contact_requests[ix] = request,
398 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
399 }
400 }
401
402 cx.notify();
403 });
404
405 Ok(())
406 })
407 }
408 }
409 }
410
411 pub fn contacts(&self) -> &[Arc<Contact>] {
412 &self.contacts
413 }
414
415 pub fn has_contact(&self, user: &Arc<User>) -> bool {
416 self.contacts
417 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
418 .is_ok()
419 }
420
421 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
422 &self.incoming_contact_requests
423 }
424
425 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
426 &self.outgoing_contact_requests
427 }
428
429 pub fn is_contact_request_pending(&self, user: &User) -> bool {
430 self.pending_contact_requests.contains_key(&user.id)
431 }
432
433 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
434 if self
435 .contacts
436 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
437 .is_ok()
438 {
439 ContactRequestStatus::RequestAccepted
440 } else if self
441 .outgoing_contact_requests
442 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
443 .is_ok()
444 {
445 ContactRequestStatus::RequestSent
446 } else if self
447 .incoming_contact_requests
448 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
449 .is_ok()
450 {
451 ContactRequestStatus::RequestReceived
452 } else {
453 ContactRequestStatus::None
454 }
455 }
456
457 pub fn request_contact(
458 &mut self,
459 responder_id: u64,
460 cx: &mut ModelContext<Self>,
461 ) -> Task<Result<()>> {
462 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
463 }
464
465 pub fn remove_contact(
466 &mut self,
467 user_id: u64,
468 cx: &mut ModelContext<Self>,
469 ) -> Task<Result<()>> {
470 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
471 }
472
473 pub fn respond_to_contact_request(
474 &mut self,
475 requester_id: u64,
476 accept: bool,
477 cx: &mut ModelContext<Self>,
478 ) -> Task<Result<()>> {
479 self.perform_contact_request(
480 requester_id,
481 proto::RespondToContactRequest {
482 requester_id,
483 response: if accept {
484 proto::ContactRequestResponse::Accept
485 } else {
486 proto::ContactRequestResponse::Decline
487 } as i32,
488 },
489 cx,
490 )
491 }
492
493 pub fn dismiss_contact_request(
494 &mut self,
495 requester_id: u64,
496 cx: &mut ModelContext<Self>,
497 ) -> Task<Result<()>> {
498 let client = self.client.upgrade();
499 cx.spawn_weak(|_, _| async move {
500 client
501 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
502 .request(proto::RespondToContactRequest {
503 requester_id,
504 response: proto::ContactRequestResponse::Dismiss as i32,
505 })
506 .await?;
507 Ok(())
508 })
509 }
510
511 fn perform_contact_request<T: RequestMessage>(
512 &mut self,
513 user_id: u64,
514 request: T,
515 cx: &mut ModelContext<Self>,
516 ) -> Task<Result<()>> {
517 let client = self.client.upgrade();
518 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
519 cx.notify();
520
521 cx.spawn(|this, mut cx| async move {
522 let response = client
523 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
524 .request(request)
525 .await;
526 this.update(&mut cx, |this, cx| {
527 if let Entry::Occupied(mut request_count) =
528 this.pending_contact_requests.entry(user_id)
529 {
530 *request_count.get_mut() -= 1;
531 if *request_count.get() == 0 {
532 request_count.remove();
533 }
534 }
535 cx.notify();
536 });
537 response?;
538 Ok(())
539 })
540 }
541
542 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
543 let (tx, mut rx) = postage::barrier::channel();
544 self.update_contacts_tx
545 .unbounded_send(UpdateContacts::Clear(tx))
546 .unwrap();
547 async move {
548 rx.next().await;
549 }
550 }
551
552 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
553 let (tx, mut rx) = postage::barrier::channel();
554 self.update_contacts_tx
555 .unbounded_send(UpdateContacts::Wait(tx))
556 .unwrap();
557 async move {
558 rx.next().await;
559 }
560 }
561
562 pub fn get_users(
563 &mut self,
564 user_ids: Vec<u64>,
565 cx: &mut ModelContext<Self>,
566 ) -> Task<Result<Vec<Arc<User>>>> {
567 let mut user_ids_to_fetch = user_ids.clone();
568 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
569
570 cx.spawn(|this, mut cx| async move {
571 if !user_ids_to_fetch.is_empty() {
572 this.update(&mut cx, |this, cx| {
573 this.load_users(
574 proto::GetUsers {
575 user_ids: user_ids_to_fetch,
576 },
577 cx,
578 )
579 })
580 .await?;
581 }
582
583 this.read_with(&cx, |this, _| {
584 user_ids
585 .iter()
586 .map(|user_id| {
587 this.users
588 .get(user_id)
589 .cloned()
590 .ok_or_else(|| anyhow!("user {} not found", user_id))
591 })
592 .collect()
593 })
594 })
595 }
596
597 pub fn fuzzy_search_users(
598 &mut self,
599 query: String,
600 cx: &mut ModelContext<Self>,
601 ) -> Task<Result<Vec<Arc<User>>>> {
602 self.load_users(proto::FuzzySearchUsers { query }, cx)
603 }
604
605 pub fn get_user(
606 &mut self,
607 user_id: u64,
608 cx: &mut ModelContext<Self>,
609 ) -> Task<Result<Arc<User>>> {
610 if let Some(user) = self.users.get(&user_id).cloned() {
611 return cx.foreground().spawn(async move { Ok(user) });
612 }
613
614 let load_users = self.get_users(vec![user_id], cx);
615 cx.spawn(|this, mut cx| async move {
616 load_users.await?;
617 this.update(&mut cx, |this, _| {
618 this.users
619 .get(&user_id)
620 .cloned()
621 .ok_or_else(|| anyhow!("server responded with no users"))
622 })
623 })
624 }
625
626 pub fn current_user(&self) -> Option<Arc<User>> {
627 self.current_user.borrow().clone()
628 }
629
630 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
631 self.current_user.clone()
632 }
633
634 fn load_users(
635 &mut self,
636 request: impl RequestMessage<Response = UsersResponse>,
637 cx: &mut ModelContext<Self>,
638 ) -> Task<Result<Vec<Arc<User>>>> {
639 let client = self.client.clone();
640 let http = self.http.clone();
641 cx.spawn_weak(|this, mut cx| async move {
642 if let Some(rpc) = client.upgrade() {
643 let response = rpc.request(request).await.context("error loading users")?;
644 let users = future::join_all(
645 response
646 .users
647 .into_iter()
648 .map(|user| User::new(user, http.as_ref())),
649 )
650 .await;
651
652 if let Some(this) = this.upgrade(&cx) {
653 this.update(&mut cx, |this, _| {
654 for user in &users {
655 this.users.insert(user.id, user.clone());
656 }
657 });
658 }
659 Ok(users)
660 } else {
661 Ok(Vec::new())
662 }
663 })
664 }
665}
666
667impl User {
668 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
669 Arc::new(User {
670 id: message.id,
671 github_login: message.github_login,
672 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
673 })
674 }
675}
676
677impl Contact {
678 async fn from_proto(
679 contact: proto::Contact,
680 user_store: &ModelHandle<UserStore>,
681 cx: &mut AsyncAppContext,
682 ) -> Result<Self> {
683 let user = user_store
684 .update(cx, |user_store, cx| {
685 user_store.get_user(contact.user_id, cx)
686 })
687 .await?;
688 let mut projects = Vec::new();
689 for project in contact.projects {
690 let mut guests = BTreeSet::new();
691 for participant_id in project.guests {
692 guests.insert(
693 user_store
694 .update(cx, |user_store, cx| user_store.get_user(participant_id, cx))
695 .await?,
696 );
697 }
698 projects.push(ProjectMetadata {
699 id: project.id,
700 visible_worktree_root_names: project.visible_worktree_root_names.clone(),
701 guests,
702 });
703 }
704 Ok(Self {
705 user,
706 online: contact.online,
707 projects,
708 })
709 }
710
711 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
712 self.projects
713 .iter()
714 .filter(|project| !project.visible_worktree_root_names.is_empty())
715 }
716}
717
718async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
719 let mut response = http
720 .get(url, Default::default(), true)
721 .await
722 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
723
724 if !response.status().is_success() {
725 return Err(anyhow!("avatar request failed {:?}", response.status()));
726 }
727
728 let mut body = Vec::new();
729 response
730 .body_mut()
731 .read_to_end(&mut body)
732 .await
733 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
734 let format = image::guess_format(&body)?;
735 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
736 Ok(ImageData::new(image))
737}