1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use crate::call::Call;
3use anyhow::{anyhow, Context, Result};
4use collections::{hash_map::Entry, BTreeSet, HashMap, HashSet};
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use util::TryFutureExt as _;
11
12#[derive(Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43 pub projects: Vec<ProjectMetadata>,
44}
45
46#[derive(Clone, Debug, PartialEq)]
47pub struct ProjectMetadata {
48 pub id: u64,
49 pub visible_worktree_root_names: Vec<String>,
50 pub guests: BTreeSet<Arc<User>>,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ContactRequestStatus {
55 None,
56 RequestSent,
57 RequestReceived,
58 RequestAccepted,
59}
60
61pub struct UserStore {
62 users: HashMap<u64, Arc<User>>,
63 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
64 current_user: watch::Receiver<Option<Arc<User>>>,
65 contacts: Vec<Arc<Contact>>,
66 incoming_contact_requests: Vec<Arc<User>>,
67 outgoing_contact_requests: Vec<Arc<User>>,
68 pending_contact_requests: HashMap<u64, usize>,
69 invite_info: Option<InviteInfo>,
70 incoming_call: (watch::Sender<Option<Call>>, watch::Receiver<Option<Call>>),
71 client: Weak<Client>,
72 http: Arc<dyn HttpClient>,
73 _maintain_contacts: Task<()>,
74 _maintain_current_user: Task<()>,
75}
76
77#[derive(Clone)]
78pub struct InviteInfo {
79 pub count: u32,
80 pub url: Arc<str>,
81}
82
83pub enum Event {
84 Contact {
85 user: Arc<User>,
86 kind: ContactEventKind,
87 },
88 ShowContacts,
89}
90
91#[derive(Clone, Copy)]
92pub enum ContactEventKind {
93 Requested,
94 Accepted,
95 Cancelled,
96}
97
98impl Entity for UserStore {
99 type Event = Event;
100}
101
102enum UpdateContacts {
103 Update(proto::UpdateContacts),
104 Wait(postage::barrier::Sender),
105 Clear(postage::barrier::Sender),
106}
107
108impl UserStore {
109 pub fn new(
110 client: Arc<Client>,
111 http: Arc<dyn HttpClient>,
112 cx: &mut ModelContext<Self>,
113 ) -> Self {
114 let (mut current_user_tx, current_user_rx) = watch::channel();
115 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
116 let rpc_subscriptions = vec![
117 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
118 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
119 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
120 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
121 client.add_message_handler(cx.handle(), Self::handle_cancel_call),
122 ];
123 Self {
124 users: Default::default(),
125 current_user: current_user_rx,
126 contacts: Default::default(),
127 incoming_contact_requests: Default::default(),
128 outgoing_contact_requests: Default::default(),
129 invite_info: None,
130 incoming_call: watch::channel(),
131 client: Arc::downgrade(&client),
132 update_contacts_tx,
133 http,
134 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
135 let _subscriptions = rpc_subscriptions;
136 while let Some(message) = update_contacts_rx.next().await {
137 if let Some(this) = this.upgrade(&cx) {
138 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
139 .log_err()
140 .await;
141 }
142 }
143 }),
144 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
145 let mut status = client.status();
146 while let Some(status) = status.next().await {
147 match status {
148 Status::Connected { .. } => {
149 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
150 let user = this
151 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
152 .log_err()
153 .await;
154 current_user_tx.send(user).await.ok();
155 }
156 }
157 Status::SignedOut => {
158 current_user_tx.send(None).await.ok();
159 if let Some(this) = this.upgrade(&cx) {
160 this.update(&mut cx, |this, _| this.clear_contacts()).await;
161 }
162 }
163 Status::ConnectionLost => {
164 if let Some(this) = this.upgrade(&cx) {
165 this.update(&mut cx, |this, _| this.clear_contacts()).await;
166 }
167 }
168 _ => {}
169 }
170 }
171 }),
172 pending_contact_requests: Default::default(),
173 }
174 }
175
176 async fn handle_update_invite_info(
177 this: ModelHandle<Self>,
178 message: TypedEnvelope<proto::UpdateInviteInfo>,
179 _: Arc<Client>,
180 mut cx: AsyncAppContext,
181 ) -> Result<()> {
182 this.update(&mut cx, |this, cx| {
183 this.invite_info = Some(InviteInfo {
184 url: Arc::from(message.payload.url),
185 count: message.payload.count,
186 });
187 cx.notify();
188 });
189 Ok(())
190 }
191
192 async fn handle_show_contacts(
193 this: ModelHandle<Self>,
194 _: TypedEnvelope<proto::ShowContacts>,
195 _: Arc<Client>,
196 mut cx: AsyncAppContext,
197 ) -> Result<()> {
198 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
199 Ok(())
200 }
201
202 async fn handle_incoming_call(
203 this: ModelHandle<Self>,
204 envelope: TypedEnvelope<proto::IncomingCall>,
205 _: Arc<Client>,
206 mut cx: AsyncAppContext,
207 ) -> Result<proto::Ack> {
208 let call = Call {
209 room_id: envelope.payload.room_id,
210 participants: this
211 .update(&mut cx, |this, cx| {
212 this.get_users(envelope.payload.participant_user_ids, cx)
213 })
214 .await?,
215 from: this
216 .update(&mut cx, |this, cx| {
217 this.get_user(envelope.payload.from_user_id, cx)
218 })
219 .await?,
220 };
221 this.update(&mut cx, |this, _| {
222 *this.incoming_call.0.borrow_mut() = Some(call);
223 });
224
225 Ok(proto::Ack {})
226 }
227
228 async fn handle_cancel_call(
229 this: ModelHandle<Self>,
230 _: TypedEnvelope<proto::CancelCall>,
231 _: Arc<Client>,
232 mut cx: AsyncAppContext,
233 ) -> Result<()> {
234 this.update(&mut cx, |this, _| {
235 *this.incoming_call.0.borrow_mut() = None;
236 });
237 Ok(())
238 }
239
240 pub fn invite_info(&self) -> Option<&InviteInfo> {
241 self.invite_info.as_ref()
242 }
243
244 pub fn incoming_call(&self) -> watch::Receiver<Option<Call>> {
245 self.incoming_call.1.clone()
246 }
247
248 async fn handle_update_contacts(
249 this: ModelHandle<Self>,
250 message: TypedEnvelope<proto::UpdateContacts>,
251 _: Arc<Client>,
252 mut cx: AsyncAppContext,
253 ) -> Result<()> {
254 this.update(&mut cx, |this, _| {
255 this.update_contacts_tx
256 .unbounded_send(UpdateContacts::Update(message.payload))
257 .unwrap();
258 });
259 Ok(())
260 }
261
262 fn update_contacts(
263 &mut self,
264 message: UpdateContacts,
265 cx: &mut ModelContext<Self>,
266 ) -> Task<Result<()>> {
267 match message {
268 UpdateContacts::Wait(barrier) => {
269 drop(barrier);
270 Task::ready(Ok(()))
271 }
272 UpdateContacts::Clear(barrier) => {
273 self.contacts.clear();
274 self.incoming_contact_requests.clear();
275 self.outgoing_contact_requests.clear();
276 drop(barrier);
277 Task::ready(Ok(()))
278 }
279 UpdateContacts::Update(message) => {
280 let mut user_ids = HashSet::default();
281 for contact in &message.contacts {
282 user_ids.insert(contact.user_id);
283 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
284 }
285 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
286 user_ids.extend(message.outgoing_requests.iter());
287
288 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
289 cx.spawn(|this, mut cx| async move {
290 load_users.await?;
291
292 // Users are fetched in parallel above and cached in call to get_users
293 // No need to paralellize here
294 let mut updated_contacts = Vec::new();
295 for contact in message.contacts {
296 let should_notify = contact.should_notify;
297 updated_contacts.push((
298 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
299 should_notify,
300 ));
301 }
302
303 let mut incoming_requests = Vec::new();
304 for request in message.incoming_requests {
305 incoming_requests.push({
306 let user = this
307 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
308 .await?;
309 (user, request.should_notify)
310 });
311 }
312
313 let mut outgoing_requests = Vec::new();
314 for requested_user_id in message.outgoing_requests {
315 outgoing_requests.push(
316 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
317 .await?,
318 );
319 }
320
321 let removed_contacts =
322 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
323 let removed_incoming_requests =
324 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
325 let removed_outgoing_requests =
326 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
327
328 this.update(&mut cx, |this, cx| {
329 // Remove contacts
330 this.contacts
331 .retain(|contact| !removed_contacts.contains(&contact.user.id));
332 // Update existing contacts and insert new ones
333 for (updated_contact, should_notify) in updated_contacts {
334 if should_notify {
335 cx.emit(Event::Contact {
336 user: updated_contact.user.clone(),
337 kind: ContactEventKind::Accepted,
338 });
339 }
340 match this.contacts.binary_search_by_key(
341 &&updated_contact.user.github_login,
342 |contact| &contact.user.github_login,
343 ) {
344 Ok(ix) => this.contacts[ix] = updated_contact,
345 Err(ix) => this.contacts.insert(ix, updated_contact),
346 }
347 }
348
349 // Remove incoming contact requests
350 this.incoming_contact_requests.retain(|user| {
351 if removed_incoming_requests.contains(&user.id) {
352 cx.emit(Event::Contact {
353 user: user.clone(),
354 kind: ContactEventKind::Cancelled,
355 });
356 false
357 } else {
358 true
359 }
360 });
361 // Update existing incoming requests and insert new ones
362 for (user, should_notify) in incoming_requests {
363 if should_notify {
364 cx.emit(Event::Contact {
365 user: user.clone(),
366 kind: ContactEventKind::Requested,
367 });
368 }
369
370 match this
371 .incoming_contact_requests
372 .binary_search_by_key(&&user.github_login, |contact| {
373 &contact.github_login
374 }) {
375 Ok(ix) => this.incoming_contact_requests[ix] = user,
376 Err(ix) => this.incoming_contact_requests.insert(ix, user),
377 }
378 }
379
380 // Remove outgoing contact requests
381 this.outgoing_contact_requests
382 .retain(|user| !removed_outgoing_requests.contains(&user.id));
383 // Update existing incoming requests and insert new ones
384 for request in outgoing_requests {
385 match this
386 .outgoing_contact_requests
387 .binary_search_by_key(&&request.github_login, |contact| {
388 &contact.github_login
389 }) {
390 Ok(ix) => this.outgoing_contact_requests[ix] = request,
391 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
392 }
393 }
394
395 cx.notify();
396 });
397
398 Ok(())
399 })
400 }
401 }
402 }
403
404 pub fn contacts(&self) -> &[Arc<Contact>] {
405 &self.contacts
406 }
407
408 pub fn has_contact(&self, user: &Arc<User>) -> bool {
409 self.contacts
410 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
411 .is_ok()
412 }
413
414 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
415 &self.incoming_contact_requests
416 }
417
418 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
419 &self.outgoing_contact_requests
420 }
421
422 pub fn is_contact_request_pending(&self, user: &User) -> bool {
423 self.pending_contact_requests.contains_key(&user.id)
424 }
425
426 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
427 if self
428 .contacts
429 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
430 .is_ok()
431 {
432 ContactRequestStatus::RequestAccepted
433 } else if self
434 .outgoing_contact_requests
435 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
436 .is_ok()
437 {
438 ContactRequestStatus::RequestSent
439 } else if self
440 .incoming_contact_requests
441 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
442 .is_ok()
443 {
444 ContactRequestStatus::RequestReceived
445 } else {
446 ContactRequestStatus::None
447 }
448 }
449
450 pub fn request_contact(
451 &mut self,
452 responder_id: u64,
453 cx: &mut ModelContext<Self>,
454 ) -> Task<Result<()>> {
455 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
456 }
457
458 pub fn remove_contact(
459 &mut self,
460 user_id: u64,
461 cx: &mut ModelContext<Self>,
462 ) -> Task<Result<()>> {
463 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
464 }
465
466 pub fn respond_to_contact_request(
467 &mut self,
468 requester_id: u64,
469 accept: bool,
470 cx: &mut ModelContext<Self>,
471 ) -> Task<Result<()>> {
472 self.perform_contact_request(
473 requester_id,
474 proto::RespondToContactRequest {
475 requester_id,
476 response: if accept {
477 proto::ContactRequestResponse::Accept
478 } else {
479 proto::ContactRequestResponse::Decline
480 } as i32,
481 },
482 cx,
483 )
484 }
485
486 pub fn dismiss_contact_request(
487 &mut self,
488 requester_id: u64,
489 cx: &mut ModelContext<Self>,
490 ) -> Task<Result<()>> {
491 let client = self.client.upgrade();
492 cx.spawn_weak(|_, _| async move {
493 client
494 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
495 .request(proto::RespondToContactRequest {
496 requester_id,
497 response: proto::ContactRequestResponse::Dismiss as i32,
498 })
499 .await?;
500 Ok(())
501 })
502 }
503
504 fn perform_contact_request<T: RequestMessage>(
505 &mut self,
506 user_id: u64,
507 request: T,
508 cx: &mut ModelContext<Self>,
509 ) -> Task<Result<()>> {
510 let client = self.client.upgrade();
511 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
512 cx.notify();
513
514 cx.spawn(|this, mut cx| async move {
515 let response = client
516 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
517 .request(request)
518 .await;
519 this.update(&mut cx, |this, cx| {
520 if let Entry::Occupied(mut request_count) =
521 this.pending_contact_requests.entry(user_id)
522 {
523 *request_count.get_mut() -= 1;
524 if *request_count.get() == 0 {
525 request_count.remove();
526 }
527 }
528 cx.notify();
529 });
530 response?;
531 Ok(())
532 })
533 }
534
535 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
536 let (tx, mut rx) = postage::barrier::channel();
537 self.update_contacts_tx
538 .unbounded_send(UpdateContacts::Clear(tx))
539 .unwrap();
540 async move {
541 rx.next().await;
542 }
543 }
544
545 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
546 let (tx, mut rx) = postage::barrier::channel();
547 self.update_contacts_tx
548 .unbounded_send(UpdateContacts::Wait(tx))
549 .unwrap();
550 async move {
551 rx.next().await;
552 }
553 }
554
555 pub fn get_users(
556 &mut self,
557 user_ids: Vec<u64>,
558 cx: &mut ModelContext<Self>,
559 ) -> Task<Result<Vec<Arc<User>>>> {
560 let mut user_ids_to_fetch = user_ids.clone();
561 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
562
563 cx.spawn(|this, mut cx| async move {
564 if !user_ids_to_fetch.is_empty() {
565 this.update(&mut cx, |this, cx| {
566 this.load_users(
567 proto::GetUsers {
568 user_ids: user_ids_to_fetch,
569 },
570 cx,
571 )
572 })
573 .await?;
574 }
575
576 this.read_with(&cx, |this, _| {
577 user_ids
578 .iter()
579 .map(|user_id| {
580 this.users
581 .get(user_id)
582 .cloned()
583 .ok_or_else(|| anyhow!("user {} not found", user_id))
584 })
585 .collect()
586 })
587 })
588 }
589
590 pub fn fuzzy_search_users(
591 &mut self,
592 query: String,
593 cx: &mut ModelContext<Self>,
594 ) -> Task<Result<Vec<Arc<User>>>> {
595 self.load_users(proto::FuzzySearchUsers { query }, cx)
596 }
597
598 pub fn get_user(
599 &mut self,
600 user_id: u64,
601 cx: &mut ModelContext<Self>,
602 ) -> Task<Result<Arc<User>>> {
603 if let Some(user) = self.users.get(&user_id).cloned() {
604 return cx.foreground().spawn(async move { Ok(user) });
605 }
606
607 let load_users = self.get_users(vec![user_id], cx);
608 cx.spawn(|this, mut cx| async move {
609 load_users.await?;
610 this.update(&mut cx, |this, _| {
611 this.users
612 .get(&user_id)
613 .cloned()
614 .ok_or_else(|| anyhow!("server responded with no users"))
615 })
616 })
617 }
618
619 pub fn current_user(&self) -> Option<Arc<User>> {
620 self.current_user.borrow().clone()
621 }
622
623 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
624 self.current_user.clone()
625 }
626
627 fn load_users(
628 &mut self,
629 request: impl RequestMessage<Response = UsersResponse>,
630 cx: &mut ModelContext<Self>,
631 ) -> Task<Result<Vec<Arc<User>>>> {
632 let client = self.client.clone();
633 let http = self.http.clone();
634 cx.spawn_weak(|this, mut cx| async move {
635 if let Some(rpc) = client.upgrade() {
636 let response = rpc.request(request).await.context("error loading users")?;
637 let users = future::join_all(
638 response
639 .users
640 .into_iter()
641 .map(|user| User::new(user, http.as_ref())),
642 )
643 .await;
644
645 if let Some(this) = this.upgrade(&cx) {
646 this.update(&mut cx, |this, _| {
647 for user in &users {
648 this.users.insert(user.id, user.clone());
649 }
650 });
651 }
652 Ok(users)
653 } else {
654 Ok(Vec::new())
655 }
656 })
657 }
658}
659
660impl User {
661 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
662 Arc::new(User {
663 id: message.id,
664 github_login: message.github_login,
665 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
666 })
667 }
668}
669
670impl Contact {
671 async fn from_proto(
672 contact: proto::Contact,
673 user_store: &ModelHandle<UserStore>,
674 cx: &mut AsyncAppContext,
675 ) -> Result<Self> {
676 let user = user_store
677 .update(cx, |user_store, cx| {
678 user_store.get_user(contact.user_id, cx)
679 })
680 .await?;
681 let mut projects = Vec::new();
682 for project in contact.projects {
683 let mut guests = BTreeSet::new();
684 for participant_id in project.guests {
685 guests.insert(
686 user_store
687 .update(cx, |user_store, cx| user_store.get_user(participant_id, cx))
688 .await?,
689 );
690 }
691 projects.push(ProjectMetadata {
692 id: project.id,
693 visible_worktree_root_names: project.visible_worktree_root_names.clone(),
694 guests,
695 });
696 }
697 Ok(Self {
698 user,
699 online: contact.online,
700 projects,
701 })
702 }
703
704 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
705 self.projects
706 .iter()
707 .filter(|project| !project.visible_worktree_root_names.is_empty())
708 }
709}
710
711async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
712 let mut response = http
713 .get(url, Default::default(), true)
714 .await
715 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
716
717 if !response.status().is_success() {
718 return Err(anyhow!("avatar request failed {:?}", response.status()));
719 }
720
721 let mut body = Vec::new();
722 response
723 .body_mut()
724 .read_to_end(&mut body)
725 .await
726 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
727 let format = image::guess_format(&body)?;
728 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
729 Ok(ImageData::new(image))
730}