1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use text::ReplicaId;
11use util::http::HttpClient;
12use util::TryFutureExt as _;
13
14pub type UserId = u64;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct ParticipantIndex(pub u32);
18
19#[derive(Default, Debug)]
20pub struct User {
21 pub id: UserId,
22 pub github_login: String,
23 pub avatar: Option<Arc<ImageData>>,
24}
25
26#[derive(Clone, Debug, PartialEq, Eq)]
27pub struct Collaborator {
28 pub peer_id: proto::PeerId,
29 pub replica_id: ReplicaId,
30 pub user_id: UserId,
31}
32
33impl PartialOrd for User {
34 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
35 Some(self.cmp(other))
36 }
37}
38
39impl Ord for User {
40 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
41 self.github_login.cmp(&other.github_login)
42 }
43}
44
45impl PartialEq for User {
46 fn eq(&self, other: &Self) -> bool {
47 self.id == other.id && self.github_login == other.github_login
48 }
49}
50
51impl Eq for User {}
52
53#[derive(Debug, PartialEq)]
54pub struct Contact {
55 pub user: Arc<User>,
56 pub online: bool,
57 pub busy: bool,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ContactRequestStatus {
62 None,
63 RequestSent,
64 RequestReceived,
65 RequestAccepted,
66}
67
68pub struct UserStore {
69 users: HashMap<u64, Arc<User>>,
70 participant_indices: HashMap<u64, ParticipantIndex>,
71 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
72 current_user: watch::Receiver<Option<Arc<User>>>,
73 contacts: Vec<Arc<Contact>>,
74 incoming_contact_requests: Vec<Arc<User>>,
75 outgoing_contact_requests: Vec<Arc<User>>,
76 pending_contact_requests: HashMap<u64, usize>,
77 invite_info: Option<InviteInfo>,
78 client: Weak<Client>,
79 http: Arc<dyn HttpClient>,
80 _maintain_contacts: Task<()>,
81 _maintain_current_user: Task<()>,
82}
83
84#[derive(Clone)]
85pub struct InviteInfo {
86 pub count: u32,
87 pub url: Arc<str>,
88}
89
90pub enum Event {
91 Contact {
92 user: Arc<User>,
93 kind: ContactEventKind,
94 },
95 ShowContacts,
96 ParticipantIndicesChanged,
97}
98
99#[derive(Clone, Copy)]
100pub enum ContactEventKind {
101 Requested,
102 Accepted,
103 Cancelled,
104}
105
106impl Entity for UserStore {
107 type Event = Event;
108}
109
110enum UpdateContacts {
111 Update(proto::UpdateContacts),
112 Wait(postage::barrier::Sender),
113 Clear(postage::barrier::Sender),
114}
115
116impl UserStore {
117 pub fn new(
118 client: Arc<Client>,
119 http: Arc<dyn HttpClient>,
120 cx: &mut ModelContext<Self>,
121 ) -> Self {
122 let (mut current_user_tx, current_user_rx) = watch::channel();
123 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
124 let rpc_subscriptions = vec![
125 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
126 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
127 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
128 ];
129 Self {
130 users: Default::default(),
131 current_user: current_user_rx,
132 contacts: Default::default(),
133 incoming_contact_requests: Default::default(),
134 participant_indices: Default::default(),
135 outgoing_contact_requests: Default::default(),
136 invite_info: None,
137 client: Arc::downgrade(&client),
138 update_contacts_tx,
139 http,
140 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
141 let _subscriptions = rpc_subscriptions;
142 while let Some(message) = update_contacts_rx.next().await {
143 if let Some(this) = this.upgrade(&cx) {
144 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
145 .log_err()
146 .await;
147 }
148 }
149 }),
150 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
151 let mut status = client.status();
152 while let Some(status) = status.next().await {
153 match status {
154 Status::Connected { .. } => {
155 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
156 let fetch_user = this
157 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
158 .log_err();
159 let fetch_metrics_id =
160 client.request(proto::GetPrivateUserInfo {}).log_err();
161 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
162
163 if let Some(info) = info {
164 cx.update(|cx| {
165 cx.update_flags(info.staff, info.flags);
166 client.telemetry.set_authenticated_user_info(
167 Some(info.metrics_id.clone()),
168 info.staff,
169 cx,
170 )
171 });
172 } else {
173 cx.read(|cx| {
174 client
175 .telemetry
176 .set_authenticated_user_info(None, false, cx)
177 });
178 }
179
180 current_user_tx.send(user).await.ok();
181
182 this.update(&mut cx, |_, cx| {
183 cx.notify();
184 });
185 }
186 }
187 Status::SignedOut => {
188 current_user_tx.send(None).await.ok();
189 if let Some(this) = this.upgrade(&cx) {
190 this.update(&mut cx, |this, cx| {
191 cx.notify();
192 this.clear_contacts()
193 })
194 .await;
195 }
196 }
197 Status::ConnectionLost => {
198 if let Some(this) = this.upgrade(&cx) {
199 this.update(&mut cx, |this, cx| {
200 cx.notify();
201 this.clear_contacts()
202 })
203 .await;
204 }
205 }
206 _ => {}
207 }
208 }
209 }),
210 pending_contact_requests: Default::default(),
211 }
212 }
213
214 #[cfg(feature = "test-support")]
215 pub fn clear_cache(&mut self) {
216 self.users.clear();
217 }
218
219 async fn handle_update_invite_info(
220 this: ModelHandle<Self>,
221 message: TypedEnvelope<proto::UpdateInviteInfo>,
222 _: Arc<Client>,
223 mut cx: AsyncAppContext,
224 ) -> Result<()> {
225 this.update(&mut cx, |this, cx| {
226 this.invite_info = Some(InviteInfo {
227 url: Arc::from(message.payload.url),
228 count: message.payload.count,
229 });
230 cx.notify();
231 });
232 Ok(())
233 }
234
235 async fn handle_show_contacts(
236 this: ModelHandle<Self>,
237 _: TypedEnvelope<proto::ShowContacts>,
238 _: Arc<Client>,
239 mut cx: AsyncAppContext,
240 ) -> Result<()> {
241 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
242 Ok(())
243 }
244
245 pub fn invite_info(&self) -> Option<&InviteInfo> {
246 self.invite_info.as_ref()
247 }
248
249 async fn handle_update_contacts(
250 this: ModelHandle<Self>,
251 message: TypedEnvelope<proto::UpdateContacts>,
252 _: Arc<Client>,
253 mut cx: AsyncAppContext,
254 ) -> Result<()> {
255 this.update(&mut cx, |this, _| {
256 this.update_contacts_tx
257 .unbounded_send(UpdateContacts::Update(message.payload))
258 .unwrap();
259 });
260 Ok(())
261 }
262
263 fn update_contacts(
264 &mut self,
265 message: UpdateContacts,
266 cx: &mut ModelContext<Self>,
267 ) -> Task<Result<()>> {
268 match message {
269 UpdateContacts::Wait(barrier) => {
270 drop(barrier);
271 Task::ready(Ok(()))
272 }
273 UpdateContacts::Clear(barrier) => {
274 self.contacts.clear();
275 self.incoming_contact_requests.clear();
276 self.outgoing_contact_requests.clear();
277 drop(barrier);
278 Task::ready(Ok(()))
279 }
280 UpdateContacts::Update(message) => {
281 let mut user_ids = HashSet::default();
282 for contact in &message.contacts {
283 user_ids.insert(contact.user_id);
284 }
285 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
286 user_ids.extend(message.outgoing_requests.iter());
287
288 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
289 cx.spawn(|this, mut cx| async move {
290 load_users.await?;
291
292 // Users are fetched in parallel above and cached in call to get_users
293 // No need to paralellize here
294 let mut updated_contacts = Vec::new();
295 for contact in message.contacts {
296 let should_notify = contact.should_notify;
297 updated_contacts.push((
298 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
299 should_notify,
300 ));
301 }
302
303 let mut incoming_requests = Vec::new();
304 for request in message.incoming_requests {
305 incoming_requests.push({
306 let user = this
307 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
308 .await?;
309 (user, request.should_notify)
310 });
311 }
312
313 let mut outgoing_requests = Vec::new();
314 for requested_user_id in message.outgoing_requests {
315 outgoing_requests.push(
316 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
317 .await?,
318 );
319 }
320
321 let removed_contacts =
322 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
323 let removed_incoming_requests =
324 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
325 let removed_outgoing_requests =
326 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
327
328 this.update(&mut cx, |this, cx| {
329 // Remove contacts
330 this.contacts
331 .retain(|contact| !removed_contacts.contains(&contact.user.id));
332 // Update existing contacts and insert new ones
333 for (updated_contact, should_notify) in updated_contacts {
334 if should_notify {
335 cx.emit(Event::Contact {
336 user: updated_contact.user.clone(),
337 kind: ContactEventKind::Accepted,
338 });
339 }
340 match this.contacts.binary_search_by_key(
341 &&updated_contact.user.github_login,
342 |contact| &contact.user.github_login,
343 ) {
344 Ok(ix) => this.contacts[ix] = updated_contact,
345 Err(ix) => this.contacts.insert(ix, updated_contact),
346 }
347 }
348
349 // Remove incoming contact requests
350 this.incoming_contact_requests.retain(|user| {
351 if removed_incoming_requests.contains(&user.id) {
352 cx.emit(Event::Contact {
353 user: user.clone(),
354 kind: ContactEventKind::Cancelled,
355 });
356 false
357 } else {
358 true
359 }
360 });
361 // Update existing incoming requests and insert new ones
362 for (user, should_notify) in incoming_requests {
363 if should_notify {
364 cx.emit(Event::Contact {
365 user: user.clone(),
366 kind: ContactEventKind::Requested,
367 });
368 }
369
370 match this
371 .incoming_contact_requests
372 .binary_search_by_key(&&user.github_login, |contact| {
373 &contact.github_login
374 }) {
375 Ok(ix) => this.incoming_contact_requests[ix] = user,
376 Err(ix) => this.incoming_contact_requests.insert(ix, user),
377 }
378 }
379
380 // Remove outgoing contact requests
381 this.outgoing_contact_requests
382 .retain(|user| !removed_outgoing_requests.contains(&user.id));
383 // Update existing incoming requests and insert new ones
384 for request in outgoing_requests {
385 match this
386 .outgoing_contact_requests
387 .binary_search_by_key(&&request.github_login, |contact| {
388 &contact.github_login
389 }) {
390 Ok(ix) => this.outgoing_contact_requests[ix] = request,
391 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
392 }
393 }
394
395 cx.notify();
396 });
397
398 Ok(())
399 })
400 }
401 }
402 }
403
404 pub fn contacts(&self) -> &[Arc<Contact>] {
405 &self.contacts
406 }
407
408 pub fn has_contact(&self, user: &Arc<User>) -> bool {
409 self.contacts
410 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
411 .is_ok()
412 }
413
414 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
415 &self.incoming_contact_requests
416 }
417
418 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
419 &self.outgoing_contact_requests
420 }
421
422 pub fn is_contact_request_pending(&self, user: &User) -> bool {
423 self.pending_contact_requests.contains_key(&user.id)
424 }
425
426 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
427 if self
428 .contacts
429 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
430 .is_ok()
431 {
432 ContactRequestStatus::RequestAccepted
433 } else if self
434 .outgoing_contact_requests
435 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
436 .is_ok()
437 {
438 ContactRequestStatus::RequestSent
439 } else if self
440 .incoming_contact_requests
441 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
442 .is_ok()
443 {
444 ContactRequestStatus::RequestReceived
445 } else {
446 ContactRequestStatus::None
447 }
448 }
449
450 pub fn request_contact(
451 &mut self,
452 responder_id: u64,
453 cx: &mut ModelContext<Self>,
454 ) -> Task<Result<()>> {
455 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
456 }
457
458 pub fn remove_contact(
459 &mut self,
460 user_id: u64,
461 cx: &mut ModelContext<Self>,
462 ) -> Task<Result<()>> {
463 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
464 }
465
466 pub fn respond_to_contact_request(
467 &mut self,
468 requester_id: u64,
469 accept: bool,
470 cx: &mut ModelContext<Self>,
471 ) -> Task<Result<()>> {
472 self.perform_contact_request(
473 requester_id,
474 proto::RespondToContactRequest {
475 requester_id,
476 response: if accept {
477 proto::ContactRequestResponse::Accept
478 } else {
479 proto::ContactRequestResponse::Decline
480 } as i32,
481 },
482 cx,
483 )
484 }
485
486 pub fn dismiss_contact_request(
487 &mut self,
488 requester_id: u64,
489 cx: &mut ModelContext<Self>,
490 ) -> Task<Result<()>> {
491 let client = self.client.upgrade();
492 cx.spawn_weak(|_, _| async move {
493 client
494 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
495 .request(proto::RespondToContactRequest {
496 requester_id,
497 response: proto::ContactRequestResponse::Dismiss as i32,
498 })
499 .await?;
500 Ok(())
501 })
502 }
503
504 fn perform_contact_request<T: RequestMessage>(
505 &mut self,
506 user_id: u64,
507 request: T,
508 cx: &mut ModelContext<Self>,
509 ) -> Task<Result<()>> {
510 let client = self.client.upgrade();
511 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
512 cx.notify();
513
514 cx.spawn(|this, mut cx| async move {
515 let response = client
516 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
517 .request(request)
518 .await;
519 this.update(&mut cx, |this, cx| {
520 if let Entry::Occupied(mut request_count) =
521 this.pending_contact_requests.entry(user_id)
522 {
523 *request_count.get_mut() -= 1;
524 if *request_count.get() == 0 {
525 request_count.remove();
526 }
527 }
528 cx.notify();
529 });
530 response?;
531 Ok(())
532 })
533 }
534
535 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
536 let (tx, mut rx) = postage::barrier::channel();
537 self.update_contacts_tx
538 .unbounded_send(UpdateContacts::Clear(tx))
539 .unwrap();
540 async move {
541 rx.next().await;
542 }
543 }
544
545 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
546 let (tx, mut rx) = postage::barrier::channel();
547 self.update_contacts_tx
548 .unbounded_send(UpdateContacts::Wait(tx))
549 .unwrap();
550 async move {
551 rx.next().await;
552 }
553 }
554
555 pub fn get_users(
556 &mut self,
557 user_ids: Vec<u64>,
558 cx: &mut ModelContext<Self>,
559 ) -> Task<Result<Vec<Arc<User>>>> {
560 let mut user_ids_to_fetch = user_ids.clone();
561 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
562
563 cx.spawn(|this, mut cx| async move {
564 if !user_ids_to_fetch.is_empty() {
565 this.update(&mut cx, |this, cx| {
566 this.load_users(
567 proto::GetUsers {
568 user_ids: user_ids_to_fetch,
569 },
570 cx,
571 )
572 })
573 .await?;
574 }
575
576 this.read_with(&cx, |this, _| {
577 user_ids
578 .iter()
579 .map(|user_id| {
580 this.users
581 .get(user_id)
582 .cloned()
583 .ok_or_else(|| anyhow!("user {} not found", user_id))
584 })
585 .collect()
586 })
587 })
588 }
589
590 pub fn fuzzy_search_users(
591 &mut self,
592 query: String,
593 cx: &mut ModelContext<Self>,
594 ) -> Task<Result<Vec<Arc<User>>>> {
595 self.load_users(proto::FuzzySearchUsers { query }, cx)
596 }
597
598 pub fn get_user(
599 &mut self,
600 user_id: u64,
601 cx: &mut ModelContext<Self>,
602 ) -> Task<Result<Arc<User>>> {
603 if let Some(user) = self.users.get(&user_id).cloned() {
604 return cx.foreground().spawn(async move { Ok(user) });
605 }
606
607 let load_users = self.get_users(vec![user_id], cx);
608 cx.spawn(|this, mut cx| async move {
609 load_users.await?;
610 this.update(&mut cx, |this, _| {
611 this.users
612 .get(&user_id)
613 .cloned()
614 .ok_or_else(|| anyhow!("server responded with no users"))
615 })
616 })
617 }
618
619 pub fn current_user(&self) -> Option<Arc<User>> {
620 self.current_user.borrow().clone()
621 }
622
623 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
624 self.current_user.clone()
625 }
626
627 fn load_users(
628 &mut self,
629 request: impl RequestMessage<Response = UsersResponse>,
630 cx: &mut ModelContext<Self>,
631 ) -> Task<Result<Vec<Arc<User>>>> {
632 let client = self.client.clone();
633 let http = self.http.clone();
634 cx.spawn_weak(|this, mut cx| async move {
635 if let Some(rpc) = client.upgrade() {
636 let response = rpc.request(request).await.context("error loading users")?;
637 let users = future::join_all(
638 response
639 .users
640 .into_iter()
641 .map(|user| User::new(user, http.as_ref())),
642 )
643 .await;
644
645 if let Some(this) = this.upgrade(&cx) {
646 this.update(&mut cx, |this, _| {
647 for user in &users {
648 this.users.insert(user.id, user.clone());
649 }
650 });
651 }
652 Ok(users)
653 } else {
654 Ok(Vec::new())
655 }
656 })
657 }
658
659 pub fn set_participant_indices(
660 &mut self,
661 participant_indices: HashMap<u64, ParticipantIndex>,
662 cx: &mut ModelContext<Self>,
663 ) {
664 if participant_indices != self.participant_indices {
665 self.participant_indices = participant_indices;
666 cx.emit(Event::ParticipantIndicesChanged);
667 }
668 }
669
670 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
671 &self.participant_indices
672 }
673}
674
675impl User {
676 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
677 Arc::new(User {
678 id: message.id,
679 github_login: message.github_login,
680 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
681 })
682 }
683}
684
685impl Contact {
686 async fn from_proto(
687 contact: proto::Contact,
688 user_store: &ModelHandle<UserStore>,
689 cx: &mut AsyncAppContext,
690 ) -> Result<Self> {
691 let user = user_store
692 .update(cx, |user_store, cx| {
693 user_store.get_user(contact.user_id, cx)
694 })
695 .await?;
696 Ok(Self {
697 user,
698 online: contact.online,
699 busy: contact.busy,
700 })
701 }
702}
703
704impl Collaborator {
705 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
706 Ok(Self {
707 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
708 replica_id: message.replica_id as ReplicaId,
709 user_id: message.user_id as UserId,
710 })
711 }
712}
713
714async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
715 let mut response = http
716 .get(url, Default::default(), true)
717 .await
718 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
719
720 if !response.status().is_success() {
721 return Err(anyhow!("avatar request failed {:?}", response.status()));
722 }
723
724 let mut body = Vec::new();
725 response
726 .body_mut()
727 .read_to_end(&mut body)
728 .await
729 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
730 let format = image::guess_format(&body)?;
731 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
732 Ok(ImageData::new(image))
733}