1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, Future, StreamExt};
6use gpui::{
7 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
8 WeakModel,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
19pub struct ChannelId(pub u64);
20
21impl std::fmt::Display for ChannelId {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 self.0.fmt(f)
24 }
25}
26
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
28pub struct ProjectId(pub u64);
29
30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
31pub struct DevServerId(pub u64);
32
33#[derive(
34 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
35)]
36pub struct RemoteProjectId(pub u64);
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct ParticipantIndex(pub u32);
40
41#[derive(Default, Debug)]
42pub struct User {
43 pub id: UserId,
44 pub github_login: String,
45 pub avatar_uri: SharedUri,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
49pub struct Collaborator {
50 pub peer_id: proto::PeerId,
51 pub replica_id: ReplicaId,
52 pub user_id: UserId,
53}
54
55impl PartialOrd for User {
56 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
57 Some(self.cmp(other))
58 }
59}
60
61impl Ord for User {
62 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
63 self.github_login.cmp(&other.github_login)
64 }
65}
66
67impl PartialEq for User {
68 fn eq(&self, other: &Self) -> bool {
69 self.id == other.id && self.github_login == other.github_login
70 }
71}
72
73impl Eq for User {}
74
75#[derive(Debug, PartialEq)]
76pub struct Contact {
77 pub user: Arc<User>,
78 pub online: bool,
79 pub busy: bool,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum ContactRequestStatus {
84 None,
85 RequestSent,
86 RequestReceived,
87 RequestAccepted,
88}
89
90pub struct UserStore {
91 users: HashMap<u64, Arc<User>>,
92 participant_indices: HashMap<u64, ParticipantIndex>,
93 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
94 current_user: watch::Receiver<Option<Arc<User>>>,
95 contacts: Vec<Arc<Contact>>,
96 incoming_contact_requests: Vec<Arc<User>>,
97 outgoing_contact_requests: Vec<Arc<User>>,
98 pending_contact_requests: HashMap<u64, usize>,
99 invite_info: Option<InviteInfo>,
100 client: Weak<Client>,
101 _maintain_contacts: Task<()>,
102 _maintain_current_user: Task<Result<()>>,
103 weak_self: WeakModel<Self>,
104}
105
106#[derive(Clone)]
107pub struct InviteInfo {
108 pub count: u32,
109 pub url: Arc<str>,
110}
111
112pub enum Event {
113 Contact {
114 user: Arc<User>,
115 kind: ContactEventKind,
116 },
117 ShowContacts,
118 ParticipantIndicesChanged,
119}
120
121#[derive(Clone, Copy)]
122pub enum ContactEventKind {
123 Requested,
124 Accepted,
125 Cancelled,
126}
127
128impl EventEmitter<Event> for UserStore {}
129
130enum UpdateContacts {
131 Update(proto::UpdateContacts),
132 Wait(postage::barrier::Sender),
133 Clear(postage::barrier::Sender),
134}
135
136impl UserStore {
137 pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
138 let (mut current_user_tx, current_user_rx) = watch::channel();
139 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
140 let rpc_subscriptions = vec![
141 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
142 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
143 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
144 ];
145 Self {
146 users: Default::default(),
147 current_user: current_user_rx,
148 contacts: Default::default(),
149 incoming_contact_requests: Default::default(),
150 participant_indices: Default::default(),
151 outgoing_contact_requests: Default::default(),
152 invite_info: None,
153 client: Arc::downgrade(&client),
154 update_contacts_tx,
155 _maintain_contacts: cx.spawn(|this, mut cx| async move {
156 let _subscriptions = rpc_subscriptions;
157 while let Some(message) = update_contacts_rx.next().await {
158 if let Ok(task) =
159 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
160 {
161 task.log_err().await;
162 } else {
163 break;
164 }
165 }
166 }),
167 _maintain_current_user: cx.spawn(|this, mut cx| async move {
168 let mut status = client.status();
169 let weak = Arc::downgrade(&client);
170 drop(client);
171 while let Some(status) = status.next().await {
172 // if the client is dropped, the app is shutting down.
173 let Some(client) = weak.upgrade() else {
174 return Ok(());
175 };
176 match status {
177 Status::Connected { .. } => {
178 if let Some(user_id) = client.user_id() {
179 let fetch_user = if let Ok(fetch_user) = this
180 .update(&mut cx, |this, cx| {
181 this.get_user(user_id, cx).log_err()
182 }) {
183 fetch_user
184 } else {
185 break;
186 };
187 let fetch_metrics_id =
188 client.request(proto::GetPrivateUserInfo {}).log_err();
189 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
190
191 cx.update(|cx| {
192 if let Some(info) = info {
193 cx.update_flags(info.staff, info.flags);
194 client.telemetry.set_authenticated_user_info(
195 Some(info.metrics_id.clone()),
196 info.staff,
197 )
198 }
199 })?;
200
201 current_user_tx.send(user).await.ok();
202
203 this.update(&mut cx, |_, cx| cx.notify())?;
204 }
205 }
206 Status::SignedOut => {
207 current_user_tx.send(None).await.ok();
208 this.update(&mut cx, |this, cx| {
209 cx.notify();
210 this.clear_contacts()
211 })?
212 .await;
213 }
214 Status::ConnectionLost => {
215 this.update(&mut cx, |this, cx| {
216 cx.notify();
217 this.clear_contacts()
218 })?
219 .await;
220 }
221 _ => {}
222 }
223 }
224 Ok(())
225 }),
226 pending_contact_requests: Default::default(),
227 weak_self: cx.weak_model(),
228 }
229 }
230
231 #[cfg(feature = "test-support")]
232 pub fn clear_cache(&mut self) {
233 self.users.clear();
234 }
235
236 async fn handle_update_invite_info(
237 this: Model<Self>,
238 message: TypedEnvelope<proto::UpdateInviteInfo>,
239 _: Arc<Client>,
240 mut cx: AsyncAppContext,
241 ) -> Result<()> {
242 this.update(&mut cx, |this, cx| {
243 this.invite_info = Some(InviteInfo {
244 url: Arc::from(message.payload.url),
245 count: message.payload.count,
246 });
247 cx.notify();
248 })?;
249 Ok(())
250 }
251
252 async fn handle_show_contacts(
253 this: Model<Self>,
254 _: TypedEnvelope<proto::ShowContacts>,
255 _: Arc<Client>,
256 mut cx: AsyncAppContext,
257 ) -> Result<()> {
258 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
259 Ok(())
260 }
261
262 pub fn invite_info(&self) -> Option<&InviteInfo> {
263 self.invite_info.as_ref()
264 }
265
266 async fn handle_update_contacts(
267 this: Model<Self>,
268 message: TypedEnvelope<proto::UpdateContacts>,
269 _: Arc<Client>,
270 mut cx: AsyncAppContext,
271 ) -> Result<()> {
272 this.update(&mut cx, |this, _| {
273 this.update_contacts_tx
274 .unbounded_send(UpdateContacts::Update(message.payload))
275 .unwrap();
276 })?;
277 Ok(())
278 }
279
280 fn update_contacts(
281 &mut self,
282 message: UpdateContacts,
283 cx: &mut ModelContext<Self>,
284 ) -> Task<Result<()>> {
285 match message {
286 UpdateContacts::Wait(barrier) => {
287 drop(barrier);
288 Task::ready(Ok(()))
289 }
290 UpdateContacts::Clear(barrier) => {
291 self.contacts.clear();
292 self.incoming_contact_requests.clear();
293 self.outgoing_contact_requests.clear();
294 drop(barrier);
295 Task::ready(Ok(()))
296 }
297 UpdateContacts::Update(message) => {
298 let mut user_ids = HashSet::default();
299 for contact in &message.contacts {
300 user_ids.insert(contact.user_id);
301 }
302 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
303 user_ids.extend(message.outgoing_requests.iter());
304
305 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
306 cx.spawn(|this, mut cx| async move {
307 load_users.await?;
308
309 // Users are fetched in parallel above and cached in call to get_users
310 // No need to parallelize here
311 let mut updated_contacts = Vec::new();
312 let this = this
313 .upgrade()
314 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
315 for contact in message.contacts {
316 updated_contacts.push(Arc::new(
317 Contact::from_proto(contact, &this, &mut cx).await?,
318 ));
319 }
320
321 let mut incoming_requests = Vec::new();
322 for request in message.incoming_requests {
323 incoming_requests.push({
324 this.update(&mut cx, |this, cx| {
325 this.get_user(request.requester_id, cx)
326 })?
327 .await?
328 });
329 }
330
331 let mut outgoing_requests = Vec::new();
332 for requested_user_id in message.outgoing_requests {
333 outgoing_requests.push(
334 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
335 .await?,
336 );
337 }
338
339 let removed_contacts =
340 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
341 let removed_incoming_requests =
342 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
343 let removed_outgoing_requests =
344 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
345
346 this.update(&mut cx, |this, cx| {
347 // Remove contacts
348 this.contacts
349 .retain(|contact| !removed_contacts.contains(&contact.user.id));
350 // Update existing contacts and insert new ones
351 for updated_contact in updated_contacts {
352 match this.contacts.binary_search_by_key(
353 &&updated_contact.user.github_login,
354 |contact| &contact.user.github_login,
355 ) {
356 Ok(ix) => this.contacts[ix] = updated_contact,
357 Err(ix) => this.contacts.insert(ix, updated_contact),
358 }
359 }
360
361 // Remove incoming contact requests
362 this.incoming_contact_requests.retain(|user| {
363 if removed_incoming_requests.contains(&user.id) {
364 cx.emit(Event::Contact {
365 user: user.clone(),
366 kind: ContactEventKind::Cancelled,
367 });
368 false
369 } else {
370 true
371 }
372 });
373 // Update existing incoming requests and insert new ones
374 for user in incoming_requests {
375 match this
376 .incoming_contact_requests
377 .binary_search_by_key(&&user.github_login, |contact| {
378 &contact.github_login
379 }) {
380 Ok(ix) => this.incoming_contact_requests[ix] = user,
381 Err(ix) => this.incoming_contact_requests.insert(ix, user),
382 }
383 }
384
385 // Remove outgoing contact requests
386 this.outgoing_contact_requests
387 .retain(|user| !removed_outgoing_requests.contains(&user.id));
388 // Update existing incoming requests and insert new ones
389 for request in outgoing_requests {
390 match this
391 .outgoing_contact_requests
392 .binary_search_by_key(&&request.github_login, |contact| {
393 &contact.github_login
394 }) {
395 Ok(ix) => this.outgoing_contact_requests[ix] = request,
396 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
397 }
398 }
399
400 cx.notify();
401 })?;
402
403 Ok(())
404 })
405 }
406 }
407 }
408
409 pub fn contacts(&self) -> &[Arc<Contact>] {
410 &self.contacts
411 }
412
413 pub fn has_contact(&self, user: &Arc<User>) -> bool {
414 self.contacts
415 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
416 .is_ok()
417 }
418
419 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
420 &self.incoming_contact_requests
421 }
422
423 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
424 &self.outgoing_contact_requests
425 }
426
427 pub fn is_contact_request_pending(&self, user: &User) -> bool {
428 self.pending_contact_requests.contains_key(&user.id)
429 }
430
431 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
432 if self
433 .contacts
434 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
435 .is_ok()
436 {
437 ContactRequestStatus::RequestAccepted
438 } else if self
439 .outgoing_contact_requests
440 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
441 .is_ok()
442 {
443 ContactRequestStatus::RequestSent
444 } else if self
445 .incoming_contact_requests
446 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
447 .is_ok()
448 {
449 ContactRequestStatus::RequestReceived
450 } else {
451 ContactRequestStatus::None
452 }
453 }
454
455 pub fn request_contact(
456 &mut self,
457 responder_id: u64,
458 cx: &mut ModelContext<Self>,
459 ) -> Task<Result<()>> {
460 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
461 }
462
463 pub fn remove_contact(
464 &mut self,
465 user_id: u64,
466 cx: &mut ModelContext<Self>,
467 ) -> Task<Result<()>> {
468 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
469 }
470
471 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
472 self.incoming_contact_requests
473 .iter()
474 .any(|user| user.id == user_id)
475 }
476
477 pub fn respond_to_contact_request(
478 &mut self,
479 requester_id: u64,
480 accept: bool,
481 cx: &mut ModelContext<Self>,
482 ) -> Task<Result<()>> {
483 self.perform_contact_request(
484 requester_id,
485 proto::RespondToContactRequest {
486 requester_id,
487 response: if accept {
488 proto::ContactRequestResponse::Accept
489 } else {
490 proto::ContactRequestResponse::Decline
491 } as i32,
492 },
493 cx,
494 )
495 }
496
497 pub fn dismiss_contact_request(
498 &mut self,
499 requester_id: u64,
500 cx: &mut ModelContext<Self>,
501 ) -> Task<Result<()>> {
502 let client = self.client.upgrade();
503 cx.spawn(move |_, _| async move {
504 client
505 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
506 .request(proto::RespondToContactRequest {
507 requester_id,
508 response: proto::ContactRequestResponse::Dismiss as i32,
509 })
510 .await?;
511 Ok(())
512 })
513 }
514
515 fn perform_contact_request<T: RequestMessage>(
516 &mut self,
517 user_id: u64,
518 request: T,
519 cx: &mut ModelContext<Self>,
520 ) -> Task<Result<()>> {
521 let client = self.client.upgrade();
522 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
523 cx.notify();
524
525 cx.spawn(move |this, mut cx| async move {
526 let response = client
527 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
528 .request(request)
529 .await;
530 this.update(&mut cx, |this, cx| {
531 if let Entry::Occupied(mut request_count) =
532 this.pending_contact_requests.entry(user_id)
533 {
534 *request_count.get_mut() -= 1;
535 if *request_count.get() == 0 {
536 request_count.remove();
537 }
538 }
539 cx.notify();
540 })?;
541 response?;
542 Ok(())
543 })
544 }
545
546 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
547 let (tx, mut rx) = postage::barrier::channel();
548 self.update_contacts_tx
549 .unbounded_send(UpdateContacts::Clear(tx))
550 .unwrap();
551 async move {
552 rx.next().await;
553 }
554 }
555
556 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
557 let (tx, mut rx) = postage::barrier::channel();
558 self.update_contacts_tx
559 .unbounded_send(UpdateContacts::Wait(tx))
560 .unwrap();
561 async move {
562 rx.next().await;
563 }
564 }
565
566 pub fn get_users(
567 &mut self,
568 user_ids: Vec<u64>,
569 cx: &mut ModelContext<Self>,
570 ) -> Task<Result<Vec<Arc<User>>>> {
571 let mut user_ids_to_fetch = user_ids.clone();
572 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
573
574 cx.spawn(|this, mut cx| async move {
575 if !user_ids_to_fetch.is_empty() {
576 this.update(&mut cx, |this, cx| {
577 this.load_users(
578 proto::GetUsers {
579 user_ids: user_ids_to_fetch,
580 },
581 cx,
582 )
583 })?
584 .await?;
585 }
586
587 this.update(&mut cx, |this, _| {
588 user_ids
589 .iter()
590 .map(|user_id| {
591 this.users
592 .get(user_id)
593 .cloned()
594 .ok_or_else(|| anyhow!("user {} not found", user_id))
595 })
596 .collect()
597 })?
598 })
599 }
600
601 pub fn fuzzy_search_users(
602 &mut self,
603 query: String,
604 cx: &mut ModelContext<Self>,
605 ) -> Task<Result<Vec<Arc<User>>>> {
606 self.load_users(proto::FuzzySearchUsers { query }, cx)
607 }
608
609 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
610 self.users.get(&user_id).cloned()
611 }
612
613 pub fn get_user_optimistic(
614 &mut self,
615 user_id: u64,
616 cx: &mut ModelContext<Self>,
617 ) -> Option<Arc<User>> {
618 if let Some(user) = self.users.get(&user_id).cloned() {
619 return Some(user);
620 }
621
622 self.get_user(user_id, cx).detach_and_log_err(cx);
623 None
624 }
625
626 pub fn get_user(
627 &mut self,
628 user_id: u64,
629 cx: &mut ModelContext<Self>,
630 ) -> Task<Result<Arc<User>>> {
631 if let Some(user) = self.users.get(&user_id).cloned() {
632 return Task::ready(Ok(user));
633 }
634
635 let load_users = self.get_users(vec![user_id], cx);
636 cx.spawn(move |this, mut cx| async move {
637 load_users.await?;
638 this.update(&mut cx, |this, _| {
639 this.users
640 .get(&user_id)
641 .cloned()
642 .ok_or_else(|| anyhow!("server responded with no users"))
643 })?
644 })
645 }
646
647 pub fn current_user(&self) -> Option<Arc<User>> {
648 self.current_user.borrow().clone()
649 }
650
651 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
652 self.current_user.clone()
653 }
654
655 fn load_users(
656 &mut self,
657 request: impl RequestMessage<Response = UsersResponse>,
658 cx: &mut ModelContext<Self>,
659 ) -> Task<Result<Vec<Arc<User>>>> {
660 let client = self.client.clone();
661 cx.spawn(|this, mut cx| async move {
662 if let Some(rpc) = client.upgrade() {
663 let response = rpc.request(request).await.context("error loading users")?;
664 let users = response
665 .users
666 .into_iter()
667 .map(User::new)
668 .collect::<Vec<_>>();
669
670 this.update(&mut cx, |this, _| {
671 for user in &users {
672 this.users.insert(user.id, user.clone());
673 }
674 })
675 .ok();
676
677 Ok(users)
678 } else {
679 Ok(Vec::new())
680 }
681 })
682 }
683
684 pub fn set_participant_indices(
685 &mut self,
686 participant_indices: HashMap<u64, ParticipantIndex>,
687 cx: &mut ModelContext<Self>,
688 ) {
689 if participant_indices != self.participant_indices {
690 self.participant_indices = participant_indices;
691 cx.emit(Event::ParticipantIndicesChanged);
692 }
693 }
694
695 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
696 &self.participant_indices
697 }
698
699 pub fn participant_names(
700 &self,
701 user_ids: impl Iterator<Item = u64>,
702 cx: &AppContext,
703 ) -> HashMap<u64, SharedString> {
704 let mut ret = HashMap::default();
705 let mut missing_user_ids = Vec::new();
706 for id in user_ids {
707 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
708 ret.insert(id, github_login.into());
709 } else {
710 missing_user_ids.push(id)
711 }
712 }
713 if !missing_user_ids.is_empty() {
714 let this = self.weak_self.clone();
715 cx.spawn(|mut cx| async move {
716 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
717 .await
718 })
719 .detach_and_log_err(cx);
720 }
721 ret
722 }
723}
724
725impl User {
726 fn new(message: proto::User) -> Arc<Self> {
727 Arc::new(User {
728 id: message.id,
729 github_login: message.github_login,
730 avatar_uri: message.avatar_url.into(),
731 })
732 }
733}
734
735impl Contact {
736 async fn from_proto(
737 contact: proto::Contact,
738 user_store: &Model<UserStore>,
739 cx: &mut AsyncAppContext,
740 ) -> Result<Self> {
741 let user = user_store
742 .update(cx, |user_store, cx| {
743 user_store.get_user(contact.user_id, cx)
744 })?
745 .await?;
746 Ok(Self {
747 user,
748 online: contact.online,
749 busy: contact.busy,
750 })
751 }
752}
753
754impl Collaborator {
755 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
756 Ok(Self {
757 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
758 replica_id: message.replica_id as ReplicaId,
759 user_id: message.user_id as UserId,
760 })
761 }
762}