1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, Future, StreamExt};
6use gpui::{
7 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
8 WeakModel,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
19pub struct ChannelId(pub u64);
20
21impl std::fmt::Display for ChannelId {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 self.0.fmt(f)
24 }
25}
26
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
28pub struct ProjectId(pub u64);
29
30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
31pub struct DevServerId(pub u64);
32
33#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
34pub struct RemoteProjectId(pub u64);
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct ParticipantIndex(pub u32);
38
39#[derive(Default, Debug)]
40pub struct User {
41 pub id: UserId,
42 pub github_login: String,
43 pub avatar_uri: SharedUri,
44}
45
46#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct Collaborator {
48 pub peer_id: proto::PeerId,
49 pub replica_id: ReplicaId,
50 pub user_id: UserId,
51}
52
53impl PartialOrd for User {
54 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
55 Some(self.cmp(other))
56 }
57}
58
59impl Ord for User {
60 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61 self.github_login.cmp(&other.github_login)
62 }
63}
64
65impl PartialEq for User {
66 fn eq(&self, other: &Self) -> bool {
67 self.id == other.id && self.github_login == other.github_login
68 }
69}
70
71impl Eq for User {}
72
73#[derive(Debug, PartialEq)]
74pub struct Contact {
75 pub user: Arc<User>,
76 pub online: bool,
77 pub busy: bool,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum ContactRequestStatus {
82 None,
83 RequestSent,
84 RequestReceived,
85 RequestAccepted,
86}
87
88pub struct UserStore {
89 users: HashMap<u64, Arc<User>>,
90 participant_indices: HashMap<u64, ParticipantIndex>,
91 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
92 current_user: watch::Receiver<Option<Arc<User>>>,
93 contacts: Vec<Arc<Contact>>,
94 incoming_contact_requests: Vec<Arc<User>>,
95 outgoing_contact_requests: Vec<Arc<User>>,
96 pending_contact_requests: HashMap<u64, usize>,
97 invite_info: Option<InviteInfo>,
98 client: Weak<Client>,
99 _maintain_contacts: Task<()>,
100 _maintain_current_user: Task<Result<()>>,
101 weak_self: WeakModel<Self>,
102}
103
104#[derive(Clone)]
105pub struct InviteInfo {
106 pub count: u32,
107 pub url: Arc<str>,
108}
109
110pub enum Event {
111 Contact {
112 user: Arc<User>,
113 kind: ContactEventKind,
114 },
115 ShowContacts,
116 ParticipantIndicesChanged,
117}
118
119#[derive(Clone, Copy)]
120pub enum ContactEventKind {
121 Requested,
122 Accepted,
123 Cancelled,
124}
125
126impl EventEmitter<Event> for UserStore {}
127
128enum UpdateContacts {
129 Update(proto::UpdateContacts),
130 Wait(postage::barrier::Sender),
131 Clear(postage::barrier::Sender),
132}
133
134impl UserStore {
135 pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
136 let (mut current_user_tx, current_user_rx) = watch::channel();
137 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
138 let rpc_subscriptions = vec![
139 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
140 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
141 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
142 ];
143 Self {
144 users: Default::default(),
145 current_user: current_user_rx,
146 contacts: Default::default(),
147 incoming_contact_requests: Default::default(),
148 participant_indices: Default::default(),
149 outgoing_contact_requests: Default::default(),
150 invite_info: None,
151 client: Arc::downgrade(&client),
152 update_contacts_tx,
153 _maintain_contacts: cx.spawn(|this, mut cx| async move {
154 let _subscriptions = rpc_subscriptions;
155 while let Some(message) = update_contacts_rx.next().await {
156 if let Ok(task) =
157 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
158 {
159 task.log_err().await;
160 } else {
161 break;
162 }
163 }
164 }),
165 _maintain_current_user: cx.spawn(|this, mut cx| async move {
166 let mut status = client.status();
167 let weak = Arc::downgrade(&client);
168 drop(client);
169 while let Some(status) = status.next().await {
170 // if the client is dropped, the app is shutting down.
171 let Some(client) = weak.upgrade() else {
172 return Ok(());
173 };
174 match status {
175 Status::Connected { .. } => {
176 if let Some(user_id) = client.user_id() {
177 let fetch_user = if let Ok(fetch_user) = this
178 .update(&mut cx, |this, cx| {
179 this.get_user(user_id, cx).log_err()
180 }) {
181 fetch_user
182 } else {
183 break;
184 };
185 let fetch_metrics_id =
186 client.request(proto::GetPrivateUserInfo {}).log_err();
187 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
188
189 cx.update(|cx| {
190 if let Some(info) = info {
191 cx.update_flags(info.staff, info.flags);
192 client.telemetry.set_authenticated_user_info(
193 Some(info.metrics_id.clone()),
194 info.staff,
195 )
196 }
197 })?;
198
199 current_user_tx.send(user).await.ok();
200
201 this.update(&mut cx, |_, cx| cx.notify())?;
202 }
203 }
204 Status::SignedOut => {
205 current_user_tx.send(None).await.ok();
206 this.update(&mut cx, |this, cx| {
207 cx.notify();
208 this.clear_contacts()
209 })?
210 .await;
211 }
212 Status::ConnectionLost => {
213 this.update(&mut cx, |this, cx| {
214 cx.notify();
215 this.clear_contacts()
216 })?
217 .await;
218 }
219 _ => {}
220 }
221 }
222 Ok(())
223 }),
224 pending_contact_requests: Default::default(),
225 weak_self: cx.weak_model(),
226 }
227 }
228
229 #[cfg(feature = "test-support")]
230 pub fn clear_cache(&mut self) {
231 self.users.clear();
232 }
233
234 async fn handle_update_invite_info(
235 this: Model<Self>,
236 message: TypedEnvelope<proto::UpdateInviteInfo>,
237 _: Arc<Client>,
238 mut cx: AsyncAppContext,
239 ) -> Result<()> {
240 this.update(&mut cx, |this, cx| {
241 this.invite_info = Some(InviteInfo {
242 url: Arc::from(message.payload.url),
243 count: message.payload.count,
244 });
245 cx.notify();
246 })?;
247 Ok(())
248 }
249
250 async fn handle_show_contacts(
251 this: Model<Self>,
252 _: TypedEnvelope<proto::ShowContacts>,
253 _: Arc<Client>,
254 mut cx: AsyncAppContext,
255 ) -> Result<()> {
256 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
257 Ok(())
258 }
259
260 pub fn invite_info(&self) -> Option<&InviteInfo> {
261 self.invite_info.as_ref()
262 }
263
264 async fn handle_update_contacts(
265 this: Model<Self>,
266 message: TypedEnvelope<proto::UpdateContacts>,
267 _: Arc<Client>,
268 mut cx: AsyncAppContext,
269 ) -> Result<()> {
270 this.update(&mut cx, |this, _| {
271 this.update_contacts_tx
272 .unbounded_send(UpdateContacts::Update(message.payload))
273 .unwrap();
274 })?;
275 Ok(())
276 }
277
278 fn update_contacts(
279 &mut self,
280 message: UpdateContacts,
281 cx: &mut ModelContext<Self>,
282 ) -> Task<Result<()>> {
283 match message {
284 UpdateContacts::Wait(barrier) => {
285 drop(barrier);
286 Task::ready(Ok(()))
287 }
288 UpdateContacts::Clear(barrier) => {
289 self.contacts.clear();
290 self.incoming_contact_requests.clear();
291 self.outgoing_contact_requests.clear();
292 drop(barrier);
293 Task::ready(Ok(()))
294 }
295 UpdateContacts::Update(message) => {
296 let mut user_ids = HashSet::default();
297 for contact in &message.contacts {
298 user_ids.insert(contact.user_id);
299 }
300 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
301 user_ids.extend(message.outgoing_requests.iter());
302
303 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
304 cx.spawn(|this, mut cx| async move {
305 load_users.await?;
306
307 // Users are fetched in parallel above and cached in call to get_users
308 // No need to parallelize here
309 let mut updated_contacts = Vec::new();
310 let this = this
311 .upgrade()
312 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
313 for contact in message.contacts {
314 updated_contacts.push(Arc::new(
315 Contact::from_proto(contact, &this, &mut cx).await?,
316 ));
317 }
318
319 let mut incoming_requests = Vec::new();
320 for request in message.incoming_requests {
321 incoming_requests.push({
322 this.update(&mut cx, |this, cx| {
323 this.get_user(request.requester_id, cx)
324 })?
325 .await?
326 });
327 }
328
329 let mut outgoing_requests = Vec::new();
330 for requested_user_id in message.outgoing_requests {
331 outgoing_requests.push(
332 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
333 .await?,
334 );
335 }
336
337 let removed_contacts =
338 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
339 let removed_incoming_requests =
340 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
341 let removed_outgoing_requests =
342 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
343
344 this.update(&mut cx, |this, cx| {
345 // Remove contacts
346 this.contacts
347 .retain(|contact| !removed_contacts.contains(&contact.user.id));
348 // Update existing contacts and insert new ones
349 for updated_contact in updated_contacts {
350 match this.contacts.binary_search_by_key(
351 &&updated_contact.user.github_login,
352 |contact| &contact.user.github_login,
353 ) {
354 Ok(ix) => this.contacts[ix] = updated_contact,
355 Err(ix) => this.contacts.insert(ix, updated_contact),
356 }
357 }
358
359 // Remove incoming contact requests
360 this.incoming_contact_requests.retain(|user| {
361 if removed_incoming_requests.contains(&user.id) {
362 cx.emit(Event::Contact {
363 user: user.clone(),
364 kind: ContactEventKind::Cancelled,
365 });
366 false
367 } else {
368 true
369 }
370 });
371 // Update existing incoming requests and insert new ones
372 for user in incoming_requests {
373 match this
374 .incoming_contact_requests
375 .binary_search_by_key(&&user.github_login, |contact| {
376 &contact.github_login
377 }) {
378 Ok(ix) => this.incoming_contact_requests[ix] = user,
379 Err(ix) => this.incoming_contact_requests.insert(ix, user),
380 }
381 }
382
383 // Remove outgoing contact requests
384 this.outgoing_contact_requests
385 .retain(|user| !removed_outgoing_requests.contains(&user.id));
386 // Update existing incoming requests and insert new ones
387 for request in outgoing_requests {
388 match this
389 .outgoing_contact_requests
390 .binary_search_by_key(&&request.github_login, |contact| {
391 &contact.github_login
392 }) {
393 Ok(ix) => this.outgoing_contact_requests[ix] = request,
394 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
395 }
396 }
397
398 cx.notify();
399 })?;
400
401 Ok(())
402 })
403 }
404 }
405 }
406
407 pub fn contacts(&self) -> &[Arc<Contact>] {
408 &self.contacts
409 }
410
411 pub fn has_contact(&self, user: &Arc<User>) -> bool {
412 self.contacts
413 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
414 .is_ok()
415 }
416
417 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
418 &self.incoming_contact_requests
419 }
420
421 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
422 &self.outgoing_contact_requests
423 }
424
425 pub fn is_contact_request_pending(&self, user: &User) -> bool {
426 self.pending_contact_requests.contains_key(&user.id)
427 }
428
429 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
430 if self
431 .contacts
432 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
433 .is_ok()
434 {
435 ContactRequestStatus::RequestAccepted
436 } else if self
437 .outgoing_contact_requests
438 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
439 .is_ok()
440 {
441 ContactRequestStatus::RequestSent
442 } else if self
443 .incoming_contact_requests
444 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
445 .is_ok()
446 {
447 ContactRequestStatus::RequestReceived
448 } else {
449 ContactRequestStatus::None
450 }
451 }
452
453 pub fn request_contact(
454 &mut self,
455 responder_id: u64,
456 cx: &mut ModelContext<Self>,
457 ) -> Task<Result<()>> {
458 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
459 }
460
461 pub fn remove_contact(
462 &mut self,
463 user_id: u64,
464 cx: &mut ModelContext<Self>,
465 ) -> Task<Result<()>> {
466 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
467 }
468
469 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
470 self.incoming_contact_requests
471 .iter()
472 .any(|user| user.id == user_id)
473 }
474
475 pub fn respond_to_contact_request(
476 &mut self,
477 requester_id: u64,
478 accept: bool,
479 cx: &mut ModelContext<Self>,
480 ) -> Task<Result<()>> {
481 self.perform_contact_request(
482 requester_id,
483 proto::RespondToContactRequest {
484 requester_id,
485 response: if accept {
486 proto::ContactRequestResponse::Accept
487 } else {
488 proto::ContactRequestResponse::Decline
489 } as i32,
490 },
491 cx,
492 )
493 }
494
495 pub fn dismiss_contact_request(
496 &mut self,
497 requester_id: u64,
498 cx: &mut ModelContext<Self>,
499 ) -> Task<Result<()>> {
500 let client = self.client.upgrade();
501 cx.spawn(move |_, _| async move {
502 client
503 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
504 .request(proto::RespondToContactRequest {
505 requester_id,
506 response: proto::ContactRequestResponse::Dismiss as i32,
507 })
508 .await?;
509 Ok(())
510 })
511 }
512
513 fn perform_contact_request<T: RequestMessage>(
514 &mut self,
515 user_id: u64,
516 request: T,
517 cx: &mut ModelContext<Self>,
518 ) -> Task<Result<()>> {
519 let client = self.client.upgrade();
520 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
521 cx.notify();
522
523 cx.spawn(move |this, mut cx| async move {
524 let response = client
525 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
526 .request(request)
527 .await;
528 this.update(&mut cx, |this, cx| {
529 if let Entry::Occupied(mut request_count) =
530 this.pending_contact_requests.entry(user_id)
531 {
532 *request_count.get_mut() -= 1;
533 if *request_count.get() == 0 {
534 request_count.remove();
535 }
536 }
537 cx.notify();
538 })?;
539 response?;
540 Ok(())
541 })
542 }
543
544 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
545 let (tx, mut rx) = postage::barrier::channel();
546 self.update_contacts_tx
547 .unbounded_send(UpdateContacts::Clear(tx))
548 .unwrap();
549 async move {
550 rx.next().await;
551 }
552 }
553
554 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
555 let (tx, mut rx) = postage::barrier::channel();
556 self.update_contacts_tx
557 .unbounded_send(UpdateContacts::Wait(tx))
558 .unwrap();
559 async move {
560 rx.next().await;
561 }
562 }
563
564 pub fn get_users(
565 &mut self,
566 user_ids: Vec<u64>,
567 cx: &mut ModelContext<Self>,
568 ) -> Task<Result<Vec<Arc<User>>>> {
569 let mut user_ids_to_fetch = user_ids.clone();
570 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
571
572 cx.spawn(|this, mut cx| async move {
573 if !user_ids_to_fetch.is_empty() {
574 this.update(&mut cx, |this, cx| {
575 this.load_users(
576 proto::GetUsers {
577 user_ids: user_ids_to_fetch,
578 },
579 cx,
580 )
581 })?
582 .await?;
583 }
584
585 this.update(&mut cx, |this, _| {
586 user_ids
587 .iter()
588 .map(|user_id| {
589 this.users
590 .get(user_id)
591 .cloned()
592 .ok_or_else(|| anyhow!("user {} not found", user_id))
593 })
594 .collect()
595 })?
596 })
597 }
598
599 pub fn fuzzy_search_users(
600 &mut self,
601 query: String,
602 cx: &mut ModelContext<Self>,
603 ) -> Task<Result<Vec<Arc<User>>>> {
604 self.load_users(proto::FuzzySearchUsers { query }, cx)
605 }
606
607 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
608 self.users.get(&user_id).cloned()
609 }
610
611 pub fn get_user_optimistic(
612 &mut self,
613 user_id: u64,
614 cx: &mut ModelContext<Self>,
615 ) -> Option<Arc<User>> {
616 if let Some(user) = self.users.get(&user_id).cloned() {
617 return Some(user);
618 }
619
620 self.get_user(user_id, cx).detach_and_log_err(cx);
621 None
622 }
623
624 pub fn get_user(
625 &mut self,
626 user_id: u64,
627 cx: &mut ModelContext<Self>,
628 ) -> Task<Result<Arc<User>>> {
629 if let Some(user) = self.users.get(&user_id).cloned() {
630 return Task::ready(Ok(user));
631 }
632
633 let load_users = self.get_users(vec![user_id], cx);
634 cx.spawn(move |this, mut cx| async move {
635 load_users.await?;
636 this.update(&mut cx, |this, _| {
637 this.users
638 .get(&user_id)
639 .cloned()
640 .ok_or_else(|| anyhow!("server responded with no users"))
641 })?
642 })
643 }
644
645 pub fn current_user(&self) -> Option<Arc<User>> {
646 self.current_user.borrow().clone()
647 }
648
649 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
650 self.current_user.clone()
651 }
652
653 fn load_users(
654 &mut self,
655 request: impl RequestMessage<Response = UsersResponse>,
656 cx: &mut ModelContext<Self>,
657 ) -> Task<Result<Vec<Arc<User>>>> {
658 let client = self.client.clone();
659 cx.spawn(|this, mut cx| async move {
660 if let Some(rpc) = client.upgrade() {
661 let response = rpc.request(request).await.context("error loading users")?;
662 let users = response
663 .users
664 .into_iter()
665 .map(User::new)
666 .collect::<Vec<_>>();
667
668 this.update(&mut cx, |this, _| {
669 for user in &users {
670 this.users.insert(user.id, user.clone());
671 }
672 })
673 .ok();
674
675 Ok(users)
676 } else {
677 Ok(Vec::new())
678 }
679 })
680 }
681
682 pub fn set_participant_indices(
683 &mut self,
684 participant_indices: HashMap<u64, ParticipantIndex>,
685 cx: &mut ModelContext<Self>,
686 ) {
687 if participant_indices != self.participant_indices {
688 self.participant_indices = participant_indices;
689 cx.emit(Event::ParticipantIndicesChanged);
690 }
691 }
692
693 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
694 &self.participant_indices
695 }
696
697 pub fn participant_names(
698 &self,
699 user_ids: impl Iterator<Item = u64>,
700 cx: &AppContext,
701 ) -> HashMap<u64, SharedString> {
702 let mut ret = HashMap::default();
703 let mut missing_user_ids = Vec::new();
704 for id in user_ids {
705 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
706 ret.insert(id, github_login.into());
707 } else {
708 missing_user_ids.push(id)
709 }
710 }
711 if !missing_user_ids.is_empty() {
712 let this = self.weak_self.clone();
713 cx.spawn(|mut cx| async move {
714 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
715 .await
716 })
717 .detach_and_log_err(cx);
718 }
719 ret
720 }
721}
722
723impl User {
724 fn new(message: proto::User) -> Arc<Self> {
725 Arc::new(User {
726 id: message.id,
727 github_login: message.github_login,
728 avatar_uri: message.avatar_url.into(),
729 })
730 }
731}
732
733impl Contact {
734 async fn from_proto(
735 contact: proto::Contact,
736 user_store: &Model<UserStore>,
737 cx: &mut AsyncAppContext,
738 ) -> Result<Self> {
739 let user = user_store
740 .update(cx, |user_store, cx| {
741 user_store.get_user(contact.user_id, cx)
742 })?
743 .await?;
744 Ok(Self {
745 user,
746 online: contact.online,
747 busy: contact.busy,
748 })
749 }
750}
751
752impl Collaborator {
753 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
754 Ok(Self {
755 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
756 replica_id: message.replica_id as ReplicaId,
757 user_id: message.user_id as UserId,
758 })
759 }
760}