1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, Future, StreamExt};
6use gpui::{AsyncAppContext, EventEmitter, Model, ModelContext, SharedUrl, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use text::ReplicaId;
11use util::TryFutureExt as _;
12
13pub type UserId = u64;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub struct ParticipantIndex(pub u32);
17
18#[derive(Default, Debug)]
19pub struct User {
20 pub id: UserId,
21 pub github_login: String,
22 pub avatar_uri: SharedUrl,
23}
24
25#[derive(Clone, Debug, PartialEq, Eq)]
26pub struct Collaborator {
27 pub peer_id: proto::PeerId,
28 pub replica_id: ReplicaId,
29 pub user_id: UserId,
30}
31
32impl PartialOrd for User {
33 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
34 Some(self.cmp(other))
35 }
36}
37
38impl Ord for User {
39 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
40 self.github_login.cmp(&other.github_login)
41 }
42}
43
44impl PartialEq for User {
45 fn eq(&self, other: &Self) -> bool {
46 self.id == other.id && self.github_login == other.github_login
47 }
48}
49
50impl Eq for User {}
51
52#[derive(Debug, PartialEq)]
53pub struct Contact {
54 pub user: Arc<User>,
55 pub online: bool,
56 pub busy: bool,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum ContactRequestStatus {
61 None,
62 RequestSent,
63 RequestReceived,
64 RequestAccepted,
65}
66
67pub struct UserStore {
68 users: HashMap<u64, Arc<User>>,
69 participant_indices: HashMap<u64, ParticipantIndex>,
70 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
71 current_user: watch::Receiver<Option<Arc<User>>>,
72 contacts: Vec<Arc<Contact>>,
73 incoming_contact_requests: Vec<Arc<User>>,
74 outgoing_contact_requests: Vec<Arc<User>>,
75 pending_contact_requests: HashMap<u64, usize>,
76 invite_info: Option<InviteInfo>,
77 client: Weak<Client>,
78 _maintain_contacts: Task<()>,
79 _maintain_current_user: Task<Result<()>>,
80}
81
82#[derive(Clone)]
83pub struct InviteInfo {
84 pub count: u32,
85 pub url: Arc<str>,
86}
87
88pub enum Event {
89 Contact {
90 user: Arc<User>,
91 kind: ContactEventKind,
92 },
93 ShowContacts,
94 ParticipantIndicesChanged,
95}
96
97#[derive(Clone, Copy)]
98pub enum ContactEventKind {
99 Requested,
100 Accepted,
101 Cancelled,
102}
103
104impl EventEmitter<Event> for UserStore {}
105
106enum UpdateContacts {
107 Update(proto::UpdateContacts),
108 Wait(postage::barrier::Sender),
109 Clear(postage::barrier::Sender),
110}
111
112impl UserStore {
113 pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
114 let (mut current_user_tx, current_user_rx) = watch::channel();
115 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
116 let rpc_subscriptions = vec![
117 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
118 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
119 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
120 ];
121 Self {
122 users: Default::default(),
123 current_user: current_user_rx,
124 contacts: Default::default(),
125 incoming_contact_requests: Default::default(),
126 participant_indices: Default::default(),
127 outgoing_contact_requests: Default::default(),
128 invite_info: None,
129 client: Arc::downgrade(&client),
130 update_contacts_tx,
131 _maintain_contacts: cx.spawn(|this, mut cx| async move {
132 let _subscriptions = rpc_subscriptions;
133 while let Some(message) = update_contacts_rx.next().await {
134 if let Ok(task) =
135 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
136 {
137 task.log_err().await;
138 } else {
139 break;
140 }
141 }
142 }),
143 _maintain_current_user: cx.spawn(|this, mut cx| async move {
144 let mut status = client.status();
145 while let Some(status) = status.next().await {
146 match status {
147 Status::Connected { .. } => {
148 if let Some(user_id) = client.user_id() {
149 let fetch_user = if let Ok(fetch_user) = this
150 .update(&mut cx, |this, cx| {
151 this.get_user(user_id, cx).log_err()
152 }) {
153 fetch_user
154 } else {
155 break;
156 };
157 let fetch_metrics_id =
158 client.request(proto::GetPrivateUserInfo {}).log_err();
159 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
160
161 cx.update(|cx| {
162 if let Some(info) = info {
163 cx.update_flags(info.staff, info.flags);
164 client.telemetry.set_authenticated_user_info(
165 Some(info.metrics_id.clone()),
166 info.staff,
167 )
168 }
169 })?;
170
171 current_user_tx.send(user).await.ok();
172
173 this.update(&mut cx, |_, cx| cx.notify())?;
174 }
175 }
176 Status::SignedOut => {
177 current_user_tx.send(None).await.ok();
178 this.update(&mut cx, |this, cx| {
179 cx.notify();
180 this.clear_contacts()
181 })?
182 .await;
183 }
184 Status::ConnectionLost => {
185 this.update(&mut cx, |this, cx| {
186 cx.notify();
187 this.clear_contacts()
188 })?
189 .await;
190 }
191 _ => {}
192 }
193 }
194 Ok(())
195 }),
196 pending_contact_requests: Default::default(),
197 }
198 }
199
200 #[cfg(feature = "test-support")]
201 pub fn clear_cache(&mut self) {
202 self.users.clear();
203 }
204
205 async fn handle_update_invite_info(
206 this: Model<Self>,
207 message: TypedEnvelope<proto::UpdateInviteInfo>,
208 _: Arc<Client>,
209 mut cx: AsyncAppContext,
210 ) -> Result<()> {
211 this.update(&mut cx, |this, cx| {
212 this.invite_info = Some(InviteInfo {
213 url: Arc::from(message.payload.url),
214 count: message.payload.count,
215 });
216 cx.notify();
217 })?;
218 Ok(())
219 }
220
221 async fn handle_show_contacts(
222 this: Model<Self>,
223 _: TypedEnvelope<proto::ShowContacts>,
224 _: Arc<Client>,
225 mut cx: AsyncAppContext,
226 ) -> Result<()> {
227 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
228 Ok(())
229 }
230
231 pub fn invite_info(&self) -> Option<&InviteInfo> {
232 self.invite_info.as_ref()
233 }
234
235 async fn handle_update_contacts(
236 this: Model<Self>,
237 message: TypedEnvelope<proto::UpdateContacts>,
238 _: Arc<Client>,
239 mut cx: AsyncAppContext,
240 ) -> Result<()> {
241 this.update(&mut cx, |this, _| {
242 this.update_contacts_tx
243 .unbounded_send(UpdateContacts::Update(message.payload))
244 .unwrap();
245 })?;
246 Ok(())
247 }
248
249 fn update_contacts(
250 &mut self,
251 message: UpdateContacts,
252 cx: &mut ModelContext<Self>,
253 ) -> Task<Result<()>> {
254 match message {
255 UpdateContacts::Wait(barrier) => {
256 drop(barrier);
257 Task::ready(Ok(()))
258 }
259 UpdateContacts::Clear(barrier) => {
260 self.contacts.clear();
261 self.incoming_contact_requests.clear();
262 self.outgoing_contact_requests.clear();
263 drop(barrier);
264 Task::ready(Ok(()))
265 }
266 UpdateContacts::Update(message) => {
267 let mut user_ids = HashSet::default();
268 for contact in &message.contacts {
269 user_ids.insert(contact.user_id);
270 }
271 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
272 user_ids.extend(message.outgoing_requests.iter());
273
274 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
275 cx.spawn(|this, mut cx| async move {
276 load_users.await?;
277
278 // Users are fetched in parallel above and cached in call to get_users
279 // No need to paralellize here
280 let mut updated_contacts = Vec::new();
281 let this = this
282 .upgrade()
283 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
284 for contact in message.contacts {
285 updated_contacts.push(Arc::new(
286 Contact::from_proto(contact, &this, &mut cx).await?,
287 ));
288 }
289
290 let mut incoming_requests = Vec::new();
291 for request in message.incoming_requests {
292 incoming_requests.push({
293 this.update(&mut cx, |this, cx| {
294 this.get_user(request.requester_id, cx)
295 })?
296 .await?
297 });
298 }
299
300 let mut outgoing_requests = Vec::new();
301 for requested_user_id in message.outgoing_requests {
302 outgoing_requests.push(
303 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
304 .await?,
305 );
306 }
307
308 let removed_contacts =
309 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
310 let removed_incoming_requests =
311 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
312 let removed_outgoing_requests =
313 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
314
315 this.update(&mut cx, |this, cx| {
316 // Remove contacts
317 this.contacts
318 .retain(|contact| !removed_contacts.contains(&contact.user.id));
319 // Update existing contacts and insert new ones
320 for updated_contact in updated_contacts {
321 match this.contacts.binary_search_by_key(
322 &&updated_contact.user.github_login,
323 |contact| &contact.user.github_login,
324 ) {
325 Ok(ix) => this.contacts[ix] = updated_contact,
326 Err(ix) => this.contacts.insert(ix, updated_contact),
327 }
328 }
329
330 // Remove incoming contact requests
331 this.incoming_contact_requests.retain(|user| {
332 if removed_incoming_requests.contains(&user.id) {
333 cx.emit(Event::Contact {
334 user: user.clone(),
335 kind: ContactEventKind::Cancelled,
336 });
337 false
338 } else {
339 true
340 }
341 });
342 // Update existing incoming requests and insert new ones
343 for user in incoming_requests {
344 match this
345 .incoming_contact_requests
346 .binary_search_by_key(&&user.github_login, |contact| {
347 &contact.github_login
348 }) {
349 Ok(ix) => this.incoming_contact_requests[ix] = user,
350 Err(ix) => this.incoming_contact_requests.insert(ix, user),
351 }
352 }
353
354 // Remove outgoing contact requests
355 this.outgoing_contact_requests
356 .retain(|user| !removed_outgoing_requests.contains(&user.id));
357 // Update existing incoming requests and insert new ones
358 for request in outgoing_requests {
359 match this
360 .outgoing_contact_requests
361 .binary_search_by_key(&&request.github_login, |contact| {
362 &contact.github_login
363 }) {
364 Ok(ix) => this.outgoing_contact_requests[ix] = request,
365 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
366 }
367 }
368
369 cx.notify();
370 })?;
371
372 Ok(())
373 })
374 }
375 }
376 }
377
378 pub fn contacts(&self) -> &[Arc<Contact>] {
379 &self.contacts
380 }
381
382 pub fn has_contact(&self, user: &Arc<User>) -> bool {
383 self.contacts
384 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
385 .is_ok()
386 }
387
388 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
389 &self.incoming_contact_requests
390 }
391
392 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
393 &self.outgoing_contact_requests
394 }
395
396 pub fn is_contact_request_pending(&self, user: &User) -> bool {
397 self.pending_contact_requests.contains_key(&user.id)
398 }
399
400 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
401 if self
402 .contacts
403 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
404 .is_ok()
405 {
406 ContactRequestStatus::RequestAccepted
407 } else if self
408 .outgoing_contact_requests
409 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
410 .is_ok()
411 {
412 ContactRequestStatus::RequestSent
413 } else if self
414 .incoming_contact_requests
415 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
416 .is_ok()
417 {
418 ContactRequestStatus::RequestReceived
419 } else {
420 ContactRequestStatus::None
421 }
422 }
423
424 pub fn request_contact(
425 &mut self,
426 responder_id: u64,
427 cx: &mut ModelContext<Self>,
428 ) -> Task<Result<()>> {
429 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
430 }
431
432 pub fn remove_contact(
433 &mut self,
434 user_id: u64,
435 cx: &mut ModelContext<Self>,
436 ) -> Task<Result<()>> {
437 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
438 }
439
440 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
441 self.incoming_contact_requests
442 .iter()
443 .any(|user| user.id == user_id)
444 }
445
446 pub fn respond_to_contact_request(
447 &mut self,
448 requester_id: u64,
449 accept: bool,
450 cx: &mut ModelContext<Self>,
451 ) -> Task<Result<()>> {
452 self.perform_contact_request(
453 requester_id,
454 proto::RespondToContactRequest {
455 requester_id,
456 response: if accept {
457 proto::ContactRequestResponse::Accept
458 } else {
459 proto::ContactRequestResponse::Decline
460 } as i32,
461 },
462 cx,
463 )
464 }
465
466 pub fn dismiss_contact_request(
467 &mut self,
468 requester_id: u64,
469 cx: &mut ModelContext<Self>,
470 ) -> Task<Result<()>> {
471 let client = self.client.upgrade();
472 cx.spawn(move |_, _| async move {
473 client
474 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
475 .request(proto::RespondToContactRequest {
476 requester_id,
477 response: proto::ContactRequestResponse::Dismiss as i32,
478 })
479 .await?;
480 Ok(())
481 })
482 }
483
484 fn perform_contact_request<T: RequestMessage>(
485 &mut self,
486 user_id: u64,
487 request: T,
488 cx: &mut ModelContext<Self>,
489 ) -> Task<Result<()>> {
490 let client = self.client.upgrade();
491 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
492 cx.notify();
493
494 cx.spawn(move |this, mut cx| async move {
495 let response = client
496 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
497 .request(request)
498 .await;
499 this.update(&mut cx, |this, cx| {
500 if let Entry::Occupied(mut request_count) =
501 this.pending_contact_requests.entry(user_id)
502 {
503 *request_count.get_mut() -= 1;
504 if *request_count.get() == 0 {
505 request_count.remove();
506 }
507 }
508 cx.notify();
509 })?;
510 response?;
511 Ok(())
512 })
513 }
514
515 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
516 let (tx, mut rx) = postage::barrier::channel();
517 self.update_contacts_tx
518 .unbounded_send(UpdateContacts::Clear(tx))
519 .unwrap();
520 async move {
521 rx.next().await;
522 }
523 }
524
525 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
526 let (tx, mut rx) = postage::barrier::channel();
527 self.update_contacts_tx
528 .unbounded_send(UpdateContacts::Wait(tx))
529 .unwrap();
530 async move {
531 rx.next().await;
532 }
533 }
534
535 pub fn get_users(
536 &mut self,
537 user_ids: Vec<u64>,
538 cx: &mut ModelContext<Self>,
539 ) -> Task<Result<Vec<Arc<User>>>> {
540 let mut user_ids_to_fetch = user_ids.clone();
541 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
542
543 cx.spawn(|this, mut cx| async move {
544 if !user_ids_to_fetch.is_empty() {
545 this.update(&mut cx, |this, cx| {
546 this.load_users(
547 proto::GetUsers {
548 user_ids: user_ids_to_fetch,
549 },
550 cx,
551 )
552 })?
553 .await?;
554 }
555
556 this.update(&mut cx, |this, _| {
557 user_ids
558 .iter()
559 .map(|user_id| {
560 this.users
561 .get(user_id)
562 .cloned()
563 .ok_or_else(|| anyhow!("user {} not found", user_id))
564 })
565 .collect()
566 })?
567 })
568 }
569
570 pub fn fuzzy_search_users(
571 &mut self,
572 query: String,
573 cx: &mut ModelContext<Self>,
574 ) -> Task<Result<Vec<Arc<User>>>> {
575 self.load_users(proto::FuzzySearchUsers { query }, cx)
576 }
577
578 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
579 self.users.get(&user_id).cloned()
580 }
581
582 pub fn get_user(
583 &mut self,
584 user_id: u64,
585 cx: &mut ModelContext<Self>,
586 ) -> Task<Result<Arc<User>>> {
587 if let Some(user) = self.users.get(&user_id).cloned() {
588 return Task::ready(Ok(user));
589 }
590
591 let load_users = self.get_users(vec![user_id], cx);
592 cx.spawn(move |this, mut cx| async move {
593 load_users.await?;
594 this.update(&mut cx, |this, _| {
595 this.users
596 .get(&user_id)
597 .cloned()
598 .ok_or_else(|| anyhow!("server responded with no users"))
599 })?
600 })
601 }
602
603 pub fn current_user(&self) -> Option<Arc<User>> {
604 self.current_user.borrow().clone()
605 }
606
607 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
608 self.current_user.clone()
609 }
610
611 fn load_users(
612 &mut self,
613 request: impl RequestMessage<Response = UsersResponse>,
614 cx: &mut ModelContext<Self>,
615 ) -> Task<Result<Vec<Arc<User>>>> {
616 let client = self.client.clone();
617 cx.spawn(|this, mut cx| async move {
618 if let Some(rpc) = client.upgrade() {
619 let response = rpc.request(request).await.context("error loading users")?;
620 let users = response
621 .users
622 .into_iter()
623 .map(|user| User::new(user))
624 .collect::<Vec<_>>();
625
626 this.update(&mut cx, |this, _| {
627 for user in &users {
628 this.users.insert(user.id, user.clone());
629 }
630 })
631 .ok();
632
633 Ok(users)
634 } else {
635 Ok(Vec::new())
636 }
637 })
638 }
639
640 pub fn set_participant_indices(
641 &mut self,
642 participant_indices: HashMap<u64, ParticipantIndex>,
643 cx: &mut ModelContext<Self>,
644 ) {
645 if participant_indices != self.participant_indices {
646 self.participant_indices = participant_indices;
647 cx.emit(Event::ParticipantIndicesChanged);
648 }
649 }
650
651 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
652 &self.participant_indices
653 }
654}
655
656impl User {
657 fn new(message: proto::User) -> Arc<Self> {
658 Arc::new(User {
659 id: message.id,
660 github_login: message.github_login,
661 avatar_uri: message.avatar_url.into(),
662 })
663 }
664}
665
666impl Contact {
667 async fn from_proto(
668 contact: proto::Contact,
669 user_store: &Model<UserStore>,
670 cx: &mut AsyncAppContext,
671 ) -> Result<Self> {
672 let user = user_store
673 .update(cx, |user_store, cx| {
674 user_store.get_user(contact.user_id, cx)
675 })?
676 .await?;
677 Ok(Self {
678 user,
679 online: contact.online,
680 busy: contact.busy,
681 })
682 }
683}
684
685impl Collaborator {
686 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
687 Ok(Self {
688 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
689 replica_id: message.replica_id as ReplicaId,
690 user_id: message.user_id as UserId,
691 })
692 }
693}