1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, Future, StreamExt};
6use gpui::{AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use text::ReplicaId;
11use util::TryFutureExt as _;
12
13pub type UserId = u64;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub struct ParticipantIndex(pub u32);
17
18#[derive(Default, Debug)]
19pub struct User {
20 pub id: UserId,
21 pub github_login: String,
22 pub avatar_uri: SharedString,
23}
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub struct Collaborator {
27 pub peer_id: proto::PeerId,
28 pub replica_id: ReplicaId,
29 pub user_id: UserId,
30}
31
32impl PartialOrd for User {
33 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34 Some(self.cmp(other))
35 }
36}
37
38impl Ord for User {
39 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40 self.github_login.cmp(&other.github_login)
41 }
42}
43
44impl PartialEq for User {
45 fn eq(&self, other: &Self) -> bool {
46 self.id == other.id && self.github_login == other.github_login
47 }
48}
49
50impl Eq for User {}
51
52#[derive(Debug, PartialEq)]
53pub struct Contact {
54 pub user: Arc<User>,
55 pub online: bool,
56 pub busy: bool,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum ContactRequestStatus {
61 None,
62 RequestSent,
63 RequestReceived,
64 RequestAccepted,
65}
66
67pub struct UserStore {
68 users: HashMap<u64, Arc<User>>,
69 participant_indices: HashMap<u64, ParticipantIndex>,
70 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
71 current_user: watch::Receiver<Option<Arc<User>>>,
72 contacts: Vec<Arc<Contact>>,
73 incoming_contact_requests: Vec<Arc<User>>,
74 outgoing_contact_requests: Vec<Arc<User>>,
75 pending_contact_requests: HashMap<u64, usize>,
76 invite_info: Option<InviteInfo>,
77 client: Weak<Client>,
78 _maintain_contacts: Task<()>,
79 _maintain_current_user: Task<Result<()>>,
80}
81
82#[derive(Clone)]
83pub struct InviteInfo {
84 pub count: u32,
85 pub url: Arc<str>,
86}
87
88pub enum Event {
89 Contact {
90 user: Arc<User>,
91 kind: ContactEventKind,
92 },
93 ShowContacts,
94 ParticipantIndicesChanged,
95}
96
97#[derive(Clone, Copy)]
98pub enum ContactEventKind {
99 Requested,
100 Accepted,
101 Cancelled,
102}
103
104impl EventEmitter<Event> for UserStore {}
105
106enum UpdateContacts {
107 Update(proto::UpdateContacts),
108 Wait(postage::barrier::Sender),
109 Clear(postage::barrier::Sender),
110}
111
112impl UserStore {
113 pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
114 let (mut current_user_tx, current_user_rx) = watch::channel();
115 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
116 let rpc_subscriptions = vec![
117 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
118 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
119 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
120 ];
121 Self {
122 users: Default::default(),
123 current_user: current_user_rx,
124 contacts: Default::default(),
125 incoming_contact_requests: Default::default(),
126 participant_indices: Default::default(),
127 outgoing_contact_requests: Default::default(),
128 invite_info: None,
129 client: Arc::downgrade(&client),
130 update_contacts_tx,
131 _maintain_contacts: cx.spawn(|this, mut cx| async move {
132 let _subscriptions = rpc_subscriptions;
133 while let Some(message) = update_contacts_rx.next().await {
134 if let Ok(task) =
135 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
136 {
137 task.log_err().await;
138 } else {
139 break;
140 }
141 }
142 }),
143 _maintain_current_user: cx.spawn(|this, mut cx| async move {
144 let mut status = client.status();
145 while let Some(status) = status.next().await {
146 match status {
147 Status::Connected { .. } => {
148 if let Some(user_id) = client.user_id() {
149 let fetch_user = if let Ok(fetch_user) = this
150 .update(&mut cx, |this, cx| {
151 this.get_user(user_id, cx).log_err()
152 }) {
153 fetch_user
154 } else {
155 break;
156 };
157 let fetch_metrics_id =
158 client.request(proto::GetPrivateUserInfo {}).log_err();
159 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
160
161 cx.update(|cx| {
162 if let Some(info) = info {
163 cx.update_flags(info.staff, info.flags);
164 client.telemetry.set_authenticated_user_info(
165 Some(info.metrics_id.clone()),
166 info.staff,
167 cx,
168 )
169 }
170 })?;
171
172 current_user_tx.send(user).await.ok();
173
174 this.update(&mut cx, |_, cx| cx.notify())?;
175 }
176 }
177 Status::SignedOut => {
178 current_user_tx.send(None).await.ok();
179 this.update(&mut cx, |this, cx| {
180 cx.notify();
181 this.clear_contacts()
182 })?
183 .await;
184 }
185 Status::ConnectionLost => {
186 this.update(&mut cx, |this, cx| {
187 cx.notify();
188 this.clear_contacts()
189 })?
190 .await;
191 }
192 _ => {}
193 }
194 }
195 Ok(())
196 }),
197 pending_contact_requests: Default::default(),
198 }
199 }
200
201 #[cfg(feature = "test-support")]
202 pub fn clear_cache(&mut self) {
203 self.users.clear();
204 }
205
206 async fn handle_update_invite_info(
207 this: Model<Self>,
208 message: TypedEnvelope<proto::UpdateInviteInfo>,
209 _: Arc<Client>,
210 mut cx: AsyncAppContext,
211 ) -> Result<()> {
212 this.update(&mut cx, |this, cx| {
213 this.invite_info = Some(InviteInfo {
214 url: Arc::from(message.payload.url),
215 count: message.payload.count,
216 });
217 cx.notify();
218 })?;
219 Ok(())
220 }
221
222 async fn handle_show_contacts(
223 this: Model<Self>,
224 _: TypedEnvelope<proto::ShowContacts>,
225 _: Arc<Client>,
226 mut cx: AsyncAppContext,
227 ) -> Result<()> {
228 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
229 Ok(())
230 }
231
232 pub fn invite_info(&self) -> Option<&InviteInfo> {
233 self.invite_info.as_ref()
234 }
235
236 async fn handle_update_contacts(
237 this: Model<Self>,
238 message: TypedEnvelope<proto::UpdateContacts>,
239 _: Arc<Client>,
240 mut cx: AsyncAppContext,
241 ) -> Result<()> {
242 this.update(&mut cx, |this, _| {
243 this.update_contacts_tx
244 .unbounded_send(UpdateContacts::Update(message.payload))
245 .unwrap();
246 })?;
247 Ok(())
248 }
249
250 fn update_contacts(
251 &mut self,
252 message: UpdateContacts,
253 cx: &mut ModelContext<Self>,
254 ) -> Task<Result<()>> {
255 match message {
256 UpdateContacts::Wait(barrier) => {
257 drop(barrier);
258 Task::ready(Ok(()))
259 }
260 UpdateContacts::Clear(barrier) => {
261 self.contacts.clear();
262 self.incoming_contact_requests.clear();
263 self.outgoing_contact_requests.clear();
264 drop(barrier);
265 Task::ready(Ok(()))
266 }
267 UpdateContacts::Update(message) => {
268 let mut user_ids = HashSet::default();
269 for contact in &message.contacts {
270 user_ids.insert(contact.user_id);
271 }
272 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
273 user_ids.extend(message.outgoing_requests.iter());
274
275 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
276 cx.spawn(|this, mut cx| async move {
277 load_users.await?;
278
279 // Users are fetched in parallel above and cached in call to get_users
280 // No need to paralellize here
281 let mut updated_contacts = Vec::new();
282 let this = this
283 .upgrade()
284 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
285 for contact in message.contacts {
286 updated_contacts.push(Arc::new(
287 Contact::from_proto(contact, &this, &mut cx).await?,
288 ));
289 }
290
291 let mut incoming_requests = Vec::new();
292 for request in message.incoming_requests {
293 incoming_requests.push({
294 this.update(&mut cx, |this, cx| {
295 this.get_user(request.requester_id, cx)
296 })?
297 .await?
298 });
299 }
300
301 let mut outgoing_requests = Vec::new();
302 for requested_user_id in message.outgoing_requests {
303 outgoing_requests.push(
304 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
305 .await?,
306 );
307 }
308
309 let removed_contacts =
310 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
311 let removed_incoming_requests =
312 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
313 let removed_outgoing_requests =
314 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
315
316 this.update(&mut cx, |this, cx| {
317 // Remove contacts
318 this.contacts
319 .retain(|contact| !removed_contacts.contains(&contact.user.id));
320 // Update existing contacts and insert new ones
321 for updated_contact in updated_contacts {
322 match this.contacts.binary_search_by_key(
323 &&updated_contact.user.github_login,
324 |contact| &contact.user.github_login,
325 ) {
326 Ok(ix) => this.contacts[ix] = updated_contact,
327 Err(ix) => this.contacts.insert(ix, updated_contact),
328 }
329 }
330
331 // Remove incoming contact requests
332 this.incoming_contact_requests.retain(|user| {
333 if removed_incoming_requests.contains(&user.id) {
334 cx.emit(Event::Contact {
335 user: user.clone(),
336 kind: ContactEventKind::Cancelled,
337 });
338 false
339 } else {
340 true
341 }
342 });
343 // Update existing incoming requests and insert new ones
344 for user in incoming_requests {
345 match this
346 .incoming_contact_requests
347 .binary_search_by_key(&&user.github_login, |contact| {
348 &contact.github_login
349 }) {
350 Ok(ix) => this.incoming_contact_requests[ix] = user,
351 Err(ix) => this.incoming_contact_requests.insert(ix, user),
352 }
353 }
354
355 // Remove outgoing contact requests
356 this.outgoing_contact_requests
357 .retain(|user| !removed_outgoing_requests.contains(&user.id));
358 // Update existing incoming requests and insert new ones
359 for request in outgoing_requests {
360 match this
361 .outgoing_contact_requests
362 .binary_search_by_key(&&request.github_login, |contact| {
363 &contact.github_login
364 }) {
365 Ok(ix) => this.outgoing_contact_requests[ix] = request,
366 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
367 }
368 }
369
370 cx.notify();
371 })?;
372
373 Ok(())
374 })
375 }
376 }
377 }
378
379 pub fn contacts(&self) -> &[Arc<Contact>] {
380 &self.contacts
381 }
382
383 pub fn has_contact(&self, user: &Arc<User>) -> bool {
384 self.contacts
385 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
386 .is_ok()
387 }
388
389 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
390 &self.incoming_contact_requests
391 }
392
393 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
394 &self.outgoing_contact_requests
395 }
396
397 pub fn is_contact_request_pending(&self, user: &User) -> bool {
398 self.pending_contact_requests.contains_key(&user.id)
399 }
400
401 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
402 if self
403 .contacts
404 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
405 .is_ok()
406 {
407 ContactRequestStatus::RequestAccepted
408 } else if self
409 .outgoing_contact_requests
410 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
411 .is_ok()
412 {
413 ContactRequestStatus::RequestSent
414 } else if self
415 .incoming_contact_requests
416 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
417 .is_ok()
418 {
419 ContactRequestStatus::RequestReceived
420 } else {
421 ContactRequestStatus::None
422 }
423 }
424
425 pub fn request_contact(
426 &mut self,
427 responder_id: u64,
428 cx: &mut ModelContext<Self>,
429 ) -> Task<Result<()>> {
430 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
431 }
432
433 pub fn remove_contact(
434 &mut self,
435 user_id: u64,
436 cx: &mut ModelContext<Self>,
437 ) -> Task<Result<()>> {
438 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
439 }
440
441 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
442 self.incoming_contact_requests
443 .iter()
444 .any(|user| user.id == user_id)
445 }
446
447 pub fn respond_to_contact_request(
448 &mut self,
449 requester_id: u64,
450 accept: bool,
451 cx: &mut ModelContext<Self>,
452 ) -> Task<Result<()>> {
453 self.perform_contact_request(
454 requester_id,
455 proto::RespondToContactRequest {
456 requester_id,
457 response: if accept {
458 proto::ContactRequestResponse::Accept
459 } else {
460 proto::ContactRequestResponse::Decline
461 } as i32,
462 },
463 cx,
464 )
465 }
466
467 pub fn dismiss_contact_request(
468 &mut self,
469 requester_id: u64,
470 cx: &mut ModelContext<Self>,
471 ) -> Task<Result<()>> {
472 let client = self.client.upgrade();
473 cx.spawn(move |_, _| async move {
474 client
475 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
476 .request(proto::RespondToContactRequest {
477 requester_id,
478 response: proto::ContactRequestResponse::Dismiss as i32,
479 })
480 .await?;
481 Ok(())
482 })
483 }
484
485 fn perform_contact_request<T: RequestMessage>(
486 &mut self,
487 user_id: u64,
488 request: T,
489 cx: &mut ModelContext<Self>,
490 ) -> Task<Result<()>> {
491 let client = self.client.upgrade();
492 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
493 cx.notify();
494
495 cx.spawn(move |this, mut cx| async move {
496 let response = client
497 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
498 .request(request)
499 .await;
500 this.update(&mut cx, |this, cx| {
501 if let Entry::Occupied(mut request_count) =
502 this.pending_contact_requests.entry(user_id)
503 {
504 *request_count.get_mut() -= 1;
505 if *request_count.get() == 0 {
506 request_count.remove();
507 }
508 }
509 cx.notify();
510 })?;
511 response?;
512 Ok(())
513 })
514 }
515
516 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
517 let (tx, mut rx) = postage::barrier::channel();
518 self.update_contacts_tx
519 .unbounded_send(UpdateContacts::Clear(tx))
520 .unwrap();
521 async move {
522 rx.next().await;
523 }
524 }
525
526 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
527 let (tx, mut rx) = postage::barrier::channel();
528 self.update_contacts_tx
529 .unbounded_send(UpdateContacts::Wait(tx))
530 .unwrap();
531 async move {
532 rx.next().await;
533 }
534 }
535
536 pub fn get_users(
537 &mut self,
538 user_ids: Vec<u64>,
539 cx: &mut ModelContext<Self>,
540 ) -> Task<Result<Vec<Arc<User>>>> {
541 let mut user_ids_to_fetch = user_ids.clone();
542 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
543
544 cx.spawn(|this, mut cx| async move {
545 if !user_ids_to_fetch.is_empty() {
546 this.update(&mut cx, |this, cx| {
547 this.load_users(
548 proto::GetUsers {
549 user_ids: user_ids_to_fetch,
550 },
551 cx,
552 )
553 })?
554 .await?;
555 }
556
557 this.update(&mut cx, |this, _| {
558 user_ids
559 .iter()
560 .map(|user_id| {
561 this.users
562 .get(user_id)
563 .cloned()
564 .ok_or_else(|| anyhow!("user {} not found", user_id))
565 })
566 .collect()
567 })?
568 })
569 }
570
571 pub fn fuzzy_search_users(
572 &mut self,
573 query: String,
574 cx: &mut ModelContext<Self>,
575 ) -> Task<Result<Vec<Arc<User>>>> {
576 self.load_users(proto::FuzzySearchUsers { query }, cx)
577 }
578
579 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
580 self.users.get(&user_id).cloned()
581 }
582
583 pub fn get_user(
584 &mut self,
585 user_id: u64,
586 cx: &mut ModelContext<Self>,
587 ) -> Task<Result<Arc<User>>> {
588 if let Some(user) = self.users.get(&user_id).cloned() {
589 return Task::ready(Ok(user));
590 }
591
592 let load_users = self.get_users(vec![user_id], cx);
593 cx.spawn(move |this, mut cx| async move {
594 load_users.await?;
595 this.update(&mut cx, |this, _| {
596 this.users
597 .get(&user_id)
598 .cloned()
599 .ok_or_else(|| anyhow!("server responded with no users"))
600 })?
601 })
602 }
603
604 pub fn current_user(&self) -> Option<Arc<User>> {
605 self.current_user.borrow().clone()
606 }
607
608 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
609 self.current_user.clone()
610 }
611
612 fn load_users(
613 &mut self,
614 request: impl RequestMessage<Response = UsersResponse>,
615 cx: &mut ModelContext<Self>,
616 ) -> Task<Result<Vec<Arc<User>>>> {
617 let client = self.client.clone();
618 cx.spawn(|this, mut cx| async move {
619 if let Some(rpc) = client.upgrade() {
620 let response = rpc.request(request).await.context("error loading users")?;
621 let users = response
622 .users
623 .into_iter()
624 .map(|user| User::new(user))
625 .collect::<Vec<_>>();
626
627 this.update(&mut cx, |this, _| {
628 for user in &users {
629 this.users.insert(user.id, user.clone());
630 }
631 })
632 .ok();
633
634 Ok(users)
635 } else {
636 Ok(Vec::new())
637 }
638 })
639 }
640
641 pub fn set_participant_indices(
642 &mut self,
643 participant_indices: HashMap<u64, ParticipantIndex>,
644 cx: &mut ModelContext<Self>,
645 ) {
646 if participant_indices != self.participant_indices {
647 self.participant_indices = participant_indices;
648 cx.emit(Event::ParticipantIndicesChanged);
649 }
650 }
651
652 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
653 &self.participant_indices
654 }
655}
656
657impl User {
658 fn new(message: proto::User) -> Arc<Self> {
659 Arc::new(User {
660 id: message.id,
661 github_login: message.github_login,
662 avatar_uri: message.avatar_url.into(),
663 })
664 }
665}
666
667impl Contact {
668 async fn from_proto(
669 contact: proto::Contact,
670 user_store: &Model<UserStore>,
671 cx: &mut AsyncAppContext,
672 ) -> Result<Self> {
673 let user = user_store
674 .update(cx, |user_store, cx| {
675 user_store.get_user(contact.user_id, cx)
676 })?
677 .await?;
678 Ok(Self {
679 user,
680 online: contact.online,
681 busy: contact.busy,
682 })
683 }
684}
685
686impl Collaborator {
687 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
688 Ok(Self {
689 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
690 replica_id: message.replica_id as ReplicaId,
691 user_id: message.user_id as UserId,
692 })
693 }
694}