1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, Future, StreamExt};
6use gpui::{
7 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
8 WeakModel,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
19pub struct ChannelId(pub u64);
20
21impl std::fmt::Display for ChannelId {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 self.0.fmt(f)
24 }
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub struct ParticipantIndex(pub u32);
29
30#[derive(Default, Debug)]
31pub struct User {
32 pub id: UserId,
33 pub github_login: String,
34 pub avatar_uri: SharedUri,
35}
36
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct Collaborator {
39 pub peer_id: proto::PeerId,
40 pub replica_id: ReplicaId,
41 pub user_id: UserId,
42}
43
44impl PartialOrd for User {
45 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
46 Some(self.cmp(other))
47 }
48}
49
50impl Ord for User {
51 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
52 self.github_login.cmp(&other.github_login)
53 }
54}
55
56impl PartialEq for User {
57 fn eq(&self, other: &Self) -> bool {
58 self.id == other.id && self.github_login == other.github_login
59 }
60}
61
62impl Eq for User {}
63
64#[derive(Debug, PartialEq)]
65pub struct Contact {
66 pub user: Arc<User>,
67 pub online: bool,
68 pub busy: bool,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum ContactRequestStatus {
73 None,
74 RequestSent,
75 RequestReceived,
76 RequestAccepted,
77}
78
79pub struct UserStore {
80 users: HashMap<u64, Arc<User>>,
81 participant_indices: HashMap<u64, ParticipantIndex>,
82 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
83 current_user: watch::Receiver<Option<Arc<User>>>,
84 contacts: Vec<Arc<Contact>>,
85 incoming_contact_requests: Vec<Arc<User>>,
86 outgoing_contact_requests: Vec<Arc<User>>,
87 pending_contact_requests: HashMap<u64, usize>,
88 invite_info: Option<InviteInfo>,
89 client: Weak<Client>,
90 _maintain_contacts: Task<()>,
91 _maintain_current_user: Task<Result<()>>,
92 weak_self: WeakModel<Self>,
93}
94
95#[derive(Clone)]
96pub struct InviteInfo {
97 pub count: u32,
98 pub url: Arc<str>,
99}
100
101pub enum Event {
102 Contact {
103 user: Arc<User>,
104 kind: ContactEventKind,
105 },
106 ShowContacts,
107 ParticipantIndicesChanged,
108}
109
110#[derive(Clone, Copy)]
111pub enum ContactEventKind {
112 Requested,
113 Accepted,
114 Cancelled,
115}
116
117impl EventEmitter<Event> for UserStore {}
118
119enum UpdateContacts {
120 Update(proto::UpdateContacts),
121 Wait(postage::barrier::Sender),
122 Clear(postage::barrier::Sender),
123}
124
125impl UserStore {
126 pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
127 let (mut current_user_tx, current_user_rx) = watch::channel();
128 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
129 let rpc_subscriptions = vec![
130 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
131 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
132 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
133 ];
134 Self {
135 users: Default::default(),
136 current_user: current_user_rx,
137 contacts: Default::default(),
138 incoming_contact_requests: Default::default(),
139 participant_indices: Default::default(),
140 outgoing_contact_requests: Default::default(),
141 invite_info: None,
142 client: Arc::downgrade(&client),
143 update_contacts_tx,
144 _maintain_contacts: cx.spawn(|this, mut cx| async move {
145 let _subscriptions = rpc_subscriptions;
146 while let Some(message) = update_contacts_rx.next().await {
147 if let Ok(task) =
148 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
149 {
150 task.log_err().await;
151 } else {
152 break;
153 }
154 }
155 }),
156 _maintain_current_user: cx.spawn(|this, mut cx| async move {
157 let mut status = client.status();
158 let weak = Arc::downgrade(&client);
159 drop(client);
160 while let Some(status) = status.next().await {
161 // if the client is dropped, the app is shutting down.
162 let Some(client) = weak.upgrade() else {
163 return Ok(());
164 };
165 match status {
166 Status::Connected { .. } => {
167 if let Some(user_id) = client.user_id() {
168 let fetch_user = if let Ok(fetch_user) = this
169 .update(&mut cx, |this, cx| {
170 this.get_user(user_id, cx).log_err()
171 }) {
172 fetch_user
173 } else {
174 break;
175 };
176 let fetch_metrics_id =
177 client.request(proto::GetPrivateUserInfo {}).log_err();
178 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
179
180 cx.update(|cx| {
181 if let Some(info) = info {
182 cx.update_flags(info.staff, info.flags);
183 client.telemetry.set_authenticated_user_info(
184 Some(info.metrics_id.clone()),
185 info.staff,
186 )
187 }
188 })?;
189
190 current_user_tx.send(user).await.ok();
191
192 this.update(&mut cx, |_, cx| cx.notify())?;
193 }
194 }
195 Status::SignedOut => {
196 current_user_tx.send(None).await.ok();
197 this.update(&mut cx, |this, cx| {
198 cx.notify();
199 this.clear_contacts()
200 })?
201 .await;
202 }
203 Status::ConnectionLost => {
204 this.update(&mut cx, |this, cx| {
205 cx.notify();
206 this.clear_contacts()
207 })?
208 .await;
209 }
210 _ => {}
211 }
212 }
213 Ok(())
214 }),
215 pending_contact_requests: Default::default(),
216 weak_self: cx.weak_model(),
217 }
218 }
219
220 #[cfg(feature = "test-support")]
221 pub fn clear_cache(&mut self) {
222 self.users.clear();
223 }
224
225 async fn handle_update_invite_info(
226 this: Model<Self>,
227 message: TypedEnvelope<proto::UpdateInviteInfo>,
228 _: Arc<Client>,
229 mut cx: AsyncAppContext,
230 ) -> Result<()> {
231 this.update(&mut cx, |this, cx| {
232 this.invite_info = Some(InviteInfo {
233 url: Arc::from(message.payload.url),
234 count: message.payload.count,
235 });
236 cx.notify();
237 })?;
238 Ok(())
239 }
240
241 async fn handle_show_contacts(
242 this: Model<Self>,
243 _: TypedEnvelope<proto::ShowContacts>,
244 _: Arc<Client>,
245 mut cx: AsyncAppContext,
246 ) -> Result<()> {
247 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
248 Ok(())
249 }
250
251 pub fn invite_info(&self) -> Option<&InviteInfo> {
252 self.invite_info.as_ref()
253 }
254
255 async fn handle_update_contacts(
256 this: Model<Self>,
257 message: TypedEnvelope<proto::UpdateContacts>,
258 _: Arc<Client>,
259 mut cx: AsyncAppContext,
260 ) -> Result<()> {
261 this.update(&mut cx, |this, _| {
262 this.update_contacts_tx
263 .unbounded_send(UpdateContacts::Update(message.payload))
264 .unwrap();
265 })?;
266 Ok(())
267 }
268
269 fn update_contacts(
270 &mut self,
271 message: UpdateContacts,
272 cx: &mut ModelContext<Self>,
273 ) -> Task<Result<()>> {
274 match message {
275 UpdateContacts::Wait(barrier) => {
276 drop(barrier);
277 Task::ready(Ok(()))
278 }
279 UpdateContacts::Clear(barrier) => {
280 self.contacts.clear();
281 self.incoming_contact_requests.clear();
282 self.outgoing_contact_requests.clear();
283 drop(barrier);
284 Task::ready(Ok(()))
285 }
286 UpdateContacts::Update(message) => {
287 let mut user_ids = HashSet::default();
288 for contact in &message.contacts {
289 user_ids.insert(contact.user_id);
290 }
291 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
292 user_ids.extend(message.outgoing_requests.iter());
293
294 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
295 cx.spawn(|this, mut cx| async move {
296 load_users.await?;
297
298 // Users are fetched in parallel above and cached in call to get_users
299 // No need to parallelize here
300 let mut updated_contacts = Vec::new();
301 let this = this
302 .upgrade()
303 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
304 for contact in message.contacts {
305 updated_contacts.push(Arc::new(
306 Contact::from_proto(contact, &this, &mut cx).await?,
307 ));
308 }
309
310 let mut incoming_requests = Vec::new();
311 for request in message.incoming_requests {
312 incoming_requests.push({
313 this.update(&mut cx, |this, cx| {
314 this.get_user(request.requester_id, cx)
315 })?
316 .await?
317 });
318 }
319
320 let mut outgoing_requests = Vec::new();
321 for requested_user_id in message.outgoing_requests {
322 outgoing_requests.push(
323 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
324 .await?,
325 );
326 }
327
328 let removed_contacts =
329 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
330 let removed_incoming_requests =
331 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
332 let removed_outgoing_requests =
333 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
334
335 this.update(&mut cx, |this, cx| {
336 // Remove contacts
337 this.contacts
338 .retain(|contact| !removed_contacts.contains(&contact.user.id));
339 // Update existing contacts and insert new ones
340 for updated_contact in updated_contacts {
341 match this.contacts.binary_search_by_key(
342 &&updated_contact.user.github_login,
343 |contact| &contact.user.github_login,
344 ) {
345 Ok(ix) => this.contacts[ix] = updated_contact,
346 Err(ix) => this.contacts.insert(ix, updated_contact),
347 }
348 }
349
350 // Remove incoming contact requests
351 this.incoming_contact_requests.retain(|user| {
352 if removed_incoming_requests.contains(&user.id) {
353 cx.emit(Event::Contact {
354 user: user.clone(),
355 kind: ContactEventKind::Cancelled,
356 });
357 false
358 } else {
359 true
360 }
361 });
362 // Update existing incoming requests and insert new ones
363 for user in incoming_requests {
364 match this
365 .incoming_contact_requests
366 .binary_search_by_key(&&user.github_login, |contact| {
367 &contact.github_login
368 }) {
369 Ok(ix) => this.incoming_contact_requests[ix] = user,
370 Err(ix) => this.incoming_contact_requests.insert(ix, user),
371 }
372 }
373
374 // Remove outgoing contact requests
375 this.outgoing_contact_requests
376 .retain(|user| !removed_outgoing_requests.contains(&user.id));
377 // Update existing incoming requests and insert new ones
378 for request in outgoing_requests {
379 match this
380 .outgoing_contact_requests
381 .binary_search_by_key(&&request.github_login, |contact| {
382 &contact.github_login
383 }) {
384 Ok(ix) => this.outgoing_contact_requests[ix] = request,
385 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
386 }
387 }
388
389 cx.notify();
390 })?;
391
392 Ok(())
393 })
394 }
395 }
396 }
397
398 pub fn contacts(&self) -> &[Arc<Contact>] {
399 &self.contacts
400 }
401
402 pub fn has_contact(&self, user: &Arc<User>) -> bool {
403 self.contacts
404 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
405 .is_ok()
406 }
407
408 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
409 &self.incoming_contact_requests
410 }
411
412 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
413 &self.outgoing_contact_requests
414 }
415
416 pub fn is_contact_request_pending(&self, user: &User) -> bool {
417 self.pending_contact_requests.contains_key(&user.id)
418 }
419
420 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
421 if self
422 .contacts
423 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
424 .is_ok()
425 {
426 ContactRequestStatus::RequestAccepted
427 } else if self
428 .outgoing_contact_requests
429 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
430 .is_ok()
431 {
432 ContactRequestStatus::RequestSent
433 } else if self
434 .incoming_contact_requests
435 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
436 .is_ok()
437 {
438 ContactRequestStatus::RequestReceived
439 } else {
440 ContactRequestStatus::None
441 }
442 }
443
444 pub fn request_contact(
445 &mut self,
446 responder_id: u64,
447 cx: &mut ModelContext<Self>,
448 ) -> Task<Result<()>> {
449 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
450 }
451
452 pub fn remove_contact(
453 &mut self,
454 user_id: u64,
455 cx: &mut ModelContext<Self>,
456 ) -> Task<Result<()>> {
457 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
458 }
459
460 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
461 self.incoming_contact_requests
462 .iter()
463 .any(|user| user.id == user_id)
464 }
465
466 pub fn respond_to_contact_request(
467 &mut self,
468 requester_id: u64,
469 accept: bool,
470 cx: &mut ModelContext<Self>,
471 ) -> Task<Result<()>> {
472 self.perform_contact_request(
473 requester_id,
474 proto::RespondToContactRequest {
475 requester_id,
476 response: if accept {
477 proto::ContactRequestResponse::Accept
478 } else {
479 proto::ContactRequestResponse::Decline
480 } as i32,
481 },
482 cx,
483 )
484 }
485
486 pub fn dismiss_contact_request(
487 &mut self,
488 requester_id: u64,
489 cx: &mut ModelContext<Self>,
490 ) -> Task<Result<()>> {
491 let client = self.client.upgrade();
492 cx.spawn(move |_, _| async move {
493 client
494 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
495 .request(proto::RespondToContactRequest {
496 requester_id,
497 response: proto::ContactRequestResponse::Dismiss as i32,
498 })
499 .await?;
500 Ok(())
501 })
502 }
503
504 fn perform_contact_request<T: RequestMessage>(
505 &mut self,
506 user_id: u64,
507 request: T,
508 cx: &mut ModelContext<Self>,
509 ) -> Task<Result<()>> {
510 let client = self.client.upgrade();
511 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
512 cx.notify();
513
514 cx.spawn(move |this, mut cx| async move {
515 let response = client
516 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
517 .request(request)
518 .await;
519 this.update(&mut cx, |this, cx| {
520 if let Entry::Occupied(mut request_count) =
521 this.pending_contact_requests.entry(user_id)
522 {
523 *request_count.get_mut() -= 1;
524 if *request_count.get() == 0 {
525 request_count.remove();
526 }
527 }
528 cx.notify();
529 })?;
530 response?;
531 Ok(())
532 })
533 }
534
535 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
536 let (tx, mut rx) = postage::barrier::channel();
537 self.update_contacts_tx
538 .unbounded_send(UpdateContacts::Clear(tx))
539 .unwrap();
540 async move {
541 rx.next().await;
542 }
543 }
544
545 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
546 let (tx, mut rx) = postage::barrier::channel();
547 self.update_contacts_tx
548 .unbounded_send(UpdateContacts::Wait(tx))
549 .unwrap();
550 async move {
551 rx.next().await;
552 }
553 }
554
555 pub fn get_users(
556 &mut self,
557 user_ids: Vec<u64>,
558 cx: &mut ModelContext<Self>,
559 ) -> Task<Result<Vec<Arc<User>>>> {
560 let mut user_ids_to_fetch = user_ids.clone();
561 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
562
563 cx.spawn(|this, mut cx| async move {
564 if !user_ids_to_fetch.is_empty() {
565 this.update(&mut cx, |this, cx| {
566 this.load_users(
567 proto::GetUsers {
568 user_ids: user_ids_to_fetch,
569 },
570 cx,
571 )
572 })?
573 .await?;
574 }
575
576 this.update(&mut cx, |this, _| {
577 user_ids
578 .iter()
579 .map(|user_id| {
580 this.users
581 .get(user_id)
582 .cloned()
583 .ok_or_else(|| anyhow!("user {} not found", user_id))
584 })
585 .collect()
586 })?
587 })
588 }
589
590 pub fn fuzzy_search_users(
591 &mut self,
592 query: String,
593 cx: &mut ModelContext<Self>,
594 ) -> Task<Result<Vec<Arc<User>>>> {
595 self.load_users(proto::FuzzySearchUsers { query }, cx)
596 }
597
598 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
599 self.users.get(&user_id).cloned()
600 }
601
602 pub fn get_user_optimistic(
603 &mut self,
604 user_id: u64,
605 cx: &mut ModelContext<Self>,
606 ) -> Option<Arc<User>> {
607 if let Some(user) = self.users.get(&user_id).cloned() {
608 return Some(user);
609 }
610
611 self.get_user(user_id, cx).detach_and_log_err(cx);
612 None
613 }
614
615 pub fn get_user(
616 &mut self,
617 user_id: u64,
618 cx: &mut ModelContext<Self>,
619 ) -> Task<Result<Arc<User>>> {
620 if let Some(user) = self.users.get(&user_id).cloned() {
621 return Task::ready(Ok(user));
622 }
623
624 let load_users = self.get_users(vec![user_id], cx);
625 cx.spawn(move |this, mut cx| async move {
626 load_users.await?;
627 this.update(&mut cx, |this, _| {
628 this.users
629 .get(&user_id)
630 .cloned()
631 .ok_or_else(|| anyhow!("server responded with no users"))
632 })?
633 })
634 }
635
636 pub fn current_user(&self) -> Option<Arc<User>> {
637 self.current_user.borrow().clone()
638 }
639
640 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
641 self.current_user.clone()
642 }
643
644 fn load_users(
645 &mut self,
646 request: impl RequestMessage<Response = UsersResponse>,
647 cx: &mut ModelContext<Self>,
648 ) -> Task<Result<Vec<Arc<User>>>> {
649 let client = self.client.clone();
650 cx.spawn(|this, mut cx| async move {
651 if let Some(rpc) = client.upgrade() {
652 let response = rpc.request(request).await.context("error loading users")?;
653 let users = response
654 .users
655 .into_iter()
656 .map(|user| User::new(user))
657 .collect::<Vec<_>>();
658
659 this.update(&mut cx, |this, _| {
660 for user in &users {
661 this.users.insert(user.id, user.clone());
662 }
663 })
664 .ok();
665
666 Ok(users)
667 } else {
668 Ok(Vec::new())
669 }
670 })
671 }
672
673 pub fn set_participant_indices(
674 &mut self,
675 participant_indices: HashMap<u64, ParticipantIndex>,
676 cx: &mut ModelContext<Self>,
677 ) {
678 if participant_indices != self.participant_indices {
679 self.participant_indices = participant_indices;
680 cx.emit(Event::ParticipantIndicesChanged);
681 }
682 }
683
684 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
685 &self.participant_indices
686 }
687
688 pub fn participant_names(
689 &self,
690 user_ids: impl Iterator<Item = u64>,
691 cx: &AppContext,
692 ) -> HashMap<u64, SharedString> {
693 let mut ret = HashMap::default();
694 let mut missing_user_ids = Vec::new();
695 for id in user_ids {
696 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
697 ret.insert(id, github_login.into());
698 } else {
699 missing_user_ids.push(id)
700 }
701 }
702 if !missing_user_ids.is_empty() {
703 let this = self.weak_self.clone();
704 cx.spawn(|mut cx| async move {
705 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
706 .await
707 })
708 .detach_and_log_err(cx);
709 }
710 ret
711 }
712}
713
714impl User {
715 fn new(message: proto::User) -> Arc<Self> {
716 Arc::new(User {
717 id: message.id,
718 github_login: message.github_login,
719 avatar_uri: message.avatar_url.into(),
720 })
721 }
722}
723
724impl Contact {
725 async fn from_proto(
726 contact: proto::Contact,
727 user_store: &Model<UserStore>,
728 cx: &mut AsyncAppContext,
729 ) -> Result<Self> {
730 let user = user_store
731 .update(cx, |user_store, cx| {
732 user_store.get_user(contact.user_id, cx)
733 })?
734 .await?;
735 Ok(Self {
736 user,
737 online: contact.online,
738 busy: contact.busy,
739 })
740 }
741}
742
743impl Collaborator {
744 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
745 Ok(Self {
746 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
747 replica_id: message.replica_id as ReplicaId,
748 user_id: message.user_id as UserId,
749 })
750 }
751}