1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, Future, StreamExt};
6use gpui::{
7 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
8 WeakModel,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
19pub struct ChannelId(pub u64);
20
21impl std::fmt::Display for ChannelId {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 self.0.fmt(f)
24 }
25}
26
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
28pub struct HostedProjectId(pub u64);
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct ParticipantIndex(pub u32);
32
33#[derive(Default, Debug)]
34pub struct User {
35 pub id: UserId,
36 pub github_login: String,
37 pub avatar_uri: SharedUri,
38}
39
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub struct Collaborator {
42 pub peer_id: proto::PeerId,
43 pub replica_id: ReplicaId,
44 pub user_id: UserId,
45}
46
47impl PartialOrd for User {
48 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
49 Some(self.cmp(other))
50 }
51}
52
53impl Ord for User {
54 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
55 self.github_login.cmp(&other.github_login)
56 }
57}
58
59impl PartialEq for User {
60 fn eq(&self, other: &Self) -> bool {
61 self.id == other.id && self.github_login == other.github_login
62 }
63}
64
65impl Eq for User {}
66
67#[derive(Debug, PartialEq)]
68pub struct Contact {
69 pub user: Arc<User>,
70 pub online: bool,
71 pub busy: bool,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum ContactRequestStatus {
76 None,
77 RequestSent,
78 RequestReceived,
79 RequestAccepted,
80}
81
82pub struct UserStore {
83 users: HashMap<u64, Arc<User>>,
84 participant_indices: HashMap<u64, ParticipantIndex>,
85 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
86 current_user: watch::Receiver<Option<Arc<User>>>,
87 contacts: Vec<Arc<Contact>>,
88 incoming_contact_requests: Vec<Arc<User>>,
89 outgoing_contact_requests: Vec<Arc<User>>,
90 pending_contact_requests: HashMap<u64, usize>,
91 invite_info: Option<InviteInfo>,
92 client: Weak<Client>,
93 _maintain_contacts: Task<()>,
94 _maintain_current_user: Task<Result<()>>,
95 weak_self: WeakModel<Self>,
96}
97
98#[derive(Clone)]
99pub struct InviteInfo {
100 pub count: u32,
101 pub url: Arc<str>,
102}
103
104pub enum Event {
105 Contact {
106 user: Arc<User>,
107 kind: ContactEventKind,
108 },
109 ShowContacts,
110 ParticipantIndicesChanged,
111}
112
113#[derive(Clone, Copy)]
114pub enum ContactEventKind {
115 Requested,
116 Accepted,
117 Cancelled,
118}
119
120impl EventEmitter<Event> for UserStore {}
121
122enum UpdateContacts {
123 Update(proto::UpdateContacts),
124 Wait(postage::barrier::Sender),
125 Clear(postage::barrier::Sender),
126}
127
128impl UserStore {
129 pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
130 let (mut current_user_tx, current_user_rx) = watch::channel();
131 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
132 let rpc_subscriptions = vec![
133 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
134 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
135 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
136 ];
137 Self {
138 users: Default::default(),
139 current_user: current_user_rx,
140 contacts: Default::default(),
141 incoming_contact_requests: Default::default(),
142 participant_indices: Default::default(),
143 outgoing_contact_requests: Default::default(),
144 invite_info: None,
145 client: Arc::downgrade(&client),
146 update_contacts_tx,
147 _maintain_contacts: cx.spawn(|this, mut cx| async move {
148 let _subscriptions = rpc_subscriptions;
149 while let Some(message) = update_contacts_rx.next().await {
150 if let Ok(task) =
151 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
152 {
153 task.log_err().await;
154 } else {
155 break;
156 }
157 }
158 }),
159 _maintain_current_user: cx.spawn(|this, mut cx| async move {
160 let mut status = client.status();
161 let weak = Arc::downgrade(&client);
162 drop(client);
163 while let Some(status) = status.next().await {
164 // if the client is dropped, the app is shutting down.
165 let Some(client) = weak.upgrade() else {
166 return Ok(());
167 };
168 match status {
169 Status::Connected { .. } => {
170 if let Some(user_id) = client.user_id() {
171 let fetch_user = if let Ok(fetch_user) = this
172 .update(&mut cx, |this, cx| {
173 this.get_user(user_id, cx).log_err()
174 }) {
175 fetch_user
176 } else {
177 break;
178 };
179 let fetch_metrics_id =
180 client.request(proto::GetPrivateUserInfo {}).log_err();
181 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
182
183 cx.update(|cx| {
184 if let Some(info) = info {
185 cx.update_flags(info.staff, info.flags);
186 client.telemetry.set_authenticated_user_info(
187 Some(info.metrics_id.clone()),
188 info.staff,
189 )
190 }
191 })?;
192
193 current_user_tx.send(user).await.ok();
194
195 this.update(&mut cx, |_, cx| cx.notify())?;
196 }
197 }
198 Status::SignedOut => {
199 current_user_tx.send(None).await.ok();
200 this.update(&mut cx, |this, cx| {
201 cx.notify();
202 this.clear_contacts()
203 })?
204 .await;
205 }
206 Status::ConnectionLost => {
207 this.update(&mut cx, |this, cx| {
208 cx.notify();
209 this.clear_contacts()
210 })?
211 .await;
212 }
213 _ => {}
214 }
215 }
216 Ok(())
217 }),
218 pending_contact_requests: Default::default(),
219 weak_self: cx.weak_model(),
220 }
221 }
222
223 #[cfg(feature = "test-support")]
224 pub fn clear_cache(&mut self) {
225 self.users.clear();
226 }
227
228 async fn handle_update_invite_info(
229 this: Model<Self>,
230 message: TypedEnvelope<proto::UpdateInviteInfo>,
231 _: Arc<Client>,
232 mut cx: AsyncAppContext,
233 ) -> Result<()> {
234 this.update(&mut cx, |this, cx| {
235 this.invite_info = Some(InviteInfo {
236 url: Arc::from(message.payload.url),
237 count: message.payload.count,
238 });
239 cx.notify();
240 })?;
241 Ok(())
242 }
243
244 async fn handle_show_contacts(
245 this: Model<Self>,
246 _: TypedEnvelope<proto::ShowContacts>,
247 _: Arc<Client>,
248 mut cx: AsyncAppContext,
249 ) -> Result<()> {
250 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
251 Ok(())
252 }
253
254 pub fn invite_info(&self) -> Option<&InviteInfo> {
255 self.invite_info.as_ref()
256 }
257
258 async fn handle_update_contacts(
259 this: Model<Self>,
260 message: TypedEnvelope<proto::UpdateContacts>,
261 _: Arc<Client>,
262 mut cx: AsyncAppContext,
263 ) -> Result<()> {
264 this.update(&mut cx, |this, _| {
265 this.update_contacts_tx
266 .unbounded_send(UpdateContacts::Update(message.payload))
267 .unwrap();
268 })?;
269 Ok(())
270 }
271
272 fn update_contacts(
273 &mut self,
274 message: UpdateContacts,
275 cx: &mut ModelContext<Self>,
276 ) -> Task<Result<()>> {
277 match message {
278 UpdateContacts::Wait(barrier) => {
279 drop(barrier);
280 Task::ready(Ok(()))
281 }
282 UpdateContacts::Clear(barrier) => {
283 self.contacts.clear();
284 self.incoming_contact_requests.clear();
285 self.outgoing_contact_requests.clear();
286 drop(barrier);
287 Task::ready(Ok(()))
288 }
289 UpdateContacts::Update(message) => {
290 let mut user_ids = HashSet::default();
291 for contact in &message.contacts {
292 user_ids.insert(contact.user_id);
293 }
294 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
295 user_ids.extend(message.outgoing_requests.iter());
296
297 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
298 cx.spawn(|this, mut cx| async move {
299 load_users.await?;
300
301 // Users are fetched in parallel above and cached in call to get_users
302 // No need to parallelize here
303 let mut updated_contacts = Vec::new();
304 let this = this
305 .upgrade()
306 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
307 for contact in message.contacts {
308 updated_contacts.push(Arc::new(
309 Contact::from_proto(contact, &this, &mut cx).await?,
310 ));
311 }
312
313 let mut incoming_requests = Vec::new();
314 for request in message.incoming_requests {
315 incoming_requests.push({
316 this.update(&mut cx, |this, cx| {
317 this.get_user(request.requester_id, cx)
318 })?
319 .await?
320 });
321 }
322
323 let mut outgoing_requests = Vec::new();
324 for requested_user_id in message.outgoing_requests {
325 outgoing_requests.push(
326 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
327 .await?,
328 );
329 }
330
331 let removed_contacts =
332 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
333 let removed_incoming_requests =
334 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
335 let removed_outgoing_requests =
336 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
337
338 this.update(&mut cx, |this, cx| {
339 // Remove contacts
340 this.contacts
341 .retain(|contact| !removed_contacts.contains(&contact.user.id));
342 // Update existing contacts and insert new ones
343 for updated_contact in updated_contacts {
344 match this.contacts.binary_search_by_key(
345 &&updated_contact.user.github_login,
346 |contact| &contact.user.github_login,
347 ) {
348 Ok(ix) => this.contacts[ix] = updated_contact,
349 Err(ix) => this.contacts.insert(ix, updated_contact),
350 }
351 }
352
353 // Remove incoming contact requests
354 this.incoming_contact_requests.retain(|user| {
355 if removed_incoming_requests.contains(&user.id) {
356 cx.emit(Event::Contact {
357 user: user.clone(),
358 kind: ContactEventKind::Cancelled,
359 });
360 false
361 } else {
362 true
363 }
364 });
365 // Update existing incoming requests and insert new ones
366 for user in incoming_requests {
367 match this
368 .incoming_contact_requests
369 .binary_search_by_key(&&user.github_login, |contact| {
370 &contact.github_login
371 }) {
372 Ok(ix) => this.incoming_contact_requests[ix] = user,
373 Err(ix) => this.incoming_contact_requests.insert(ix, user),
374 }
375 }
376
377 // Remove outgoing contact requests
378 this.outgoing_contact_requests
379 .retain(|user| !removed_outgoing_requests.contains(&user.id));
380 // Update existing incoming requests and insert new ones
381 for request in outgoing_requests {
382 match this
383 .outgoing_contact_requests
384 .binary_search_by_key(&&request.github_login, |contact| {
385 &contact.github_login
386 }) {
387 Ok(ix) => this.outgoing_contact_requests[ix] = request,
388 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
389 }
390 }
391
392 cx.notify();
393 })?;
394
395 Ok(())
396 })
397 }
398 }
399 }
400
401 pub fn contacts(&self) -> &[Arc<Contact>] {
402 &self.contacts
403 }
404
405 pub fn has_contact(&self, user: &Arc<User>) -> bool {
406 self.contacts
407 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
408 .is_ok()
409 }
410
411 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
412 &self.incoming_contact_requests
413 }
414
415 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
416 &self.outgoing_contact_requests
417 }
418
419 pub fn is_contact_request_pending(&self, user: &User) -> bool {
420 self.pending_contact_requests.contains_key(&user.id)
421 }
422
423 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
424 if self
425 .contacts
426 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
427 .is_ok()
428 {
429 ContactRequestStatus::RequestAccepted
430 } else if self
431 .outgoing_contact_requests
432 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
433 .is_ok()
434 {
435 ContactRequestStatus::RequestSent
436 } else if self
437 .incoming_contact_requests
438 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
439 .is_ok()
440 {
441 ContactRequestStatus::RequestReceived
442 } else {
443 ContactRequestStatus::None
444 }
445 }
446
447 pub fn request_contact(
448 &mut self,
449 responder_id: u64,
450 cx: &mut ModelContext<Self>,
451 ) -> Task<Result<()>> {
452 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
453 }
454
455 pub fn remove_contact(
456 &mut self,
457 user_id: u64,
458 cx: &mut ModelContext<Self>,
459 ) -> Task<Result<()>> {
460 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
461 }
462
463 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
464 self.incoming_contact_requests
465 .iter()
466 .any(|user| user.id == user_id)
467 }
468
469 pub fn respond_to_contact_request(
470 &mut self,
471 requester_id: u64,
472 accept: bool,
473 cx: &mut ModelContext<Self>,
474 ) -> Task<Result<()>> {
475 self.perform_contact_request(
476 requester_id,
477 proto::RespondToContactRequest {
478 requester_id,
479 response: if accept {
480 proto::ContactRequestResponse::Accept
481 } else {
482 proto::ContactRequestResponse::Decline
483 } as i32,
484 },
485 cx,
486 )
487 }
488
489 pub fn dismiss_contact_request(
490 &mut self,
491 requester_id: u64,
492 cx: &mut ModelContext<Self>,
493 ) -> Task<Result<()>> {
494 let client = self.client.upgrade();
495 cx.spawn(move |_, _| async move {
496 client
497 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
498 .request(proto::RespondToContactRequest {
499 requester_id,
500 response: proto::ContactRequestResponse::Dismiss as i32,
501 })
502 .await?;
503 Ok(())
504 })
505 }
506
507 fn perform_contact_request<T: RequestMessage>(
508 &mut self,
509 user_id: u64,
510 request: T,
511 cx: &mut ModelContext<Self>,
512 ) -> Task<Result<()>> {
513 let client = self.client.upgrade();
514 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
515 cx.notify();
516
517 cx.spawn(move |this, mut cx| async move {
518 let response = client
519 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
520 .request(request)
521 .await;
522 this.update(&mut cx, |this, cx| {
523 if let Entry::Occupied(mut request_count) =
524 this.pending_contact_requests.entry(user_id)
525 {
526 *request_count.get_mut() -= 1;
527 if *request_count.get() == 0 {
528 request_count.remove();
529 }
530 }
531 cx.notify();
532 })?;
533 response?;
534 Ok(())
535 })
536 }
537
538 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
539 let (tx, mut rx) = postage::barrier::channel();
540 self.update_contacts_tx
541 .unbounded_send(UpdateContacts::Clear(tx))
542 .unwrap();
543 async move {
544 rx.next().await;
545 }
546 }
547
548 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
549 let (tx, mut rx) = postage::barrier::channel();
550 self.update_contacts_tx
551 .unbounded_send(UpdateContacts::Wait(tx))
552 .unwrap();
553 async move {
554 rx.next().await;
555 }
556 }
557
558 pub fn get_users(
559 &mut self,
560 user_ids: Vec<u64>,
561 cx: &mut ModelContext<Self>,
562 ) -> Task<Result<Vec<Arc<User>>>> {
563 let mut user_ids_to_fetch = user_ids.clone();
564 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
565
566 cx.spawn(|this, mut cx| async move {
567 if !user_ids_to_fetch.is_empty() {
568 this.update(&mut cx, |this, cx| {
569 this.load_users(
570 proto::GetUsers {
571 user_ids: user_ids_to_fetch,
572 },
573 cx,
574 )
575 })?
576 .await?;
577 }
578
579 this.update(&mut cx, |this, _| {
580 user_ids
581 .iter()
582 .map(|user_id| {
583 this.users
584 .get(user_id)
585 .cloned()
586 .ok_or_else(|| anyhow!("user {} not found", user_id))
587 })
588 .collect()
589 })?
590 })
591 }
592
593 pub fn fuzzy_search_users(
594 &mut self,
595 query: String,
596 cx: &mut ModelContext<Self>,
597 ) -> Task<Result<Vec<Arc<User>>>> {
598 self.load_users(proto::FuzzySearchUsers { query }, cx)
599 }
600
601 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
602 self.users.get(&user_id).cloned()
603 }
604
605 pub fn get_user_optimistic(
606 &mut self,
607 user_id: u64,
608 cx: &mut ModelContext<Self>,
609 ) -> Option<Arc<User>> {
610 if let Some(user) = self.users.get(&user_id).cloned() {
611 return Some(user);
612 }
613
614 self.get_user(user_id, cx).detach_and_log_err(cx);
615 None
616 }
617
618 pub fn get_user(
619 &mut self,
620 user_id: u64,
621 cx: &mut ModelContext<Self>,
622 ) -> Task<Result<Arc<User>>> {
623 if let Some(user) = self.users.get(&user_id).cloned() {
624 return Task::ready(Ok(user));
625 }
626
627 let load_users = self.get_users(vec![user_id], cx);
628 cx.spawn(move |this, mut cx| async move {
629 load_users.await?;
630 this.update(&mut cx, |this, _| {
631 this.users
632 .get(&user_id)
633 .cloned()
634 .ok_or_else(|| anyhow!("server responded with no users"))
635 })?
636 })
637 }
638
639 pub fn current_user(&self) -> Option<Arc<User>> {
640 self.current_user.borrow().clone()
641 }
642
643 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
644 self.current_user.clone()
645 }
646
647 fn load_users(
648 &mut self,
649 request: impl RequestMessage<Response = UsersResponse>,
650 cx: &mut ModelContext<Self>,
651 ) -> Task<Result<Vec<Arc<User>>>> {
652 let client = self.client.clone();
653 cx.spawn(|this, mut cx| async move {
654 if let Some(rpc) = client.upgrade() {
655 let response = rpc.request(request).await.context("error loading users")?;
656 let users = response
657 .users
658 .into_iter()
659 .map(User::new)
660 .collect::<Vec<_>>();
661
662 this.update(&mut cx, |this, _| {
663 for user in &users {
664 this.users.insert(user.id, user.clone());
665 }
666 })
667 .ok();
668
669 Ok(users)
670 } else {
671 Ok(Vec::new())
672 }
673 })
674 }
675
676 pub fn set_participant_indices(
677 &mut self,
678 participant_indices: HashMap<u64, ParticipantIndex>,
679 cx: &mut ModelContext<Self>,
680 ) {
681 if participant_indices != self.participant_indices {
682 self.participant_indices = participant_indices;
683 cx.emit(Event::ParticipantIndicesChanged);
684 }
685 }
686
687 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
688 &self.participant_indices
689 }
690
691 pub fn participant_names(
692 &self,
693 user_ids: impl Iterator<Item = u64>,
694 cx: &AppContext,
695 ) -> HashMap<u64, SharedString> {
696 let mut ret = HashMap::default();
697 let mut missing_user_ids = Vec::new();
698 for id in user_ids {
699 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
700 ret.insert(id, github_login.into());
701 } else {
702 missing_user_ids.push(id)
703 }
704 }
705 if !missing_user_ids.is_empty() {
706 let this = self.weak_self.clone();
707 cx.spawn(|mut cx| async move {
708 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
709 .await
710 })
711 .detach_and_log_err(cx);
712 }
713 ret
714 }
715}
716
717impl User {
718 fn new(message: proto::User) -> Arc<Self> {
719 Arc::new(User {
720 id: message.id,
721 github_login: message.github_login,
722 avatar_uri: message.avatar_url.into(),
723 })
724 }
725}
726
727impl Contact {
728 async fn from_proto(
729 contact: proto::Contact,
730 user_store: &Model<UserStore>,
731 cx: &mut AsyncAppContext,
732 ) -> Result<Self> {
733 let user = user_store
734 .update(cx, |user_store, cx| {
735 user_store.get_user(contact.user_id, cx)
736 })?
737 .await?;
738 Ok(Self {
739 user,
740 online: contact.online,
741 busy: contact.busy,
742 })
743 }
744}
745
746impl Collaborator {
747 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
748 Ok(Self {
749 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
750 replica_id: message.replica_id as ReplicaId,
751 user_id: message.user_id as UserId,
752 })
753 }
754}