1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context as _, Result};
3use chrono::{DateTime, Utc};
4use collections::{hash_map::Entry, HashMap, HashSet};
5use feature_flags::FeatureFlagAppExt;
6use futures::{channel::mpsc, Future, StreamExt};
7use gpui::{
8 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(
19 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
20)]
21pub struct ChannelId(pub u64);
22
23impl std::fmt::Display for ChannelId {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 self.0.fmt(f)
26 }
27}
28
29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
30pub struct ProjectId(pub u64);
31
32#[derive(
33 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
34)]
35pub struct DevServerProjectId(pub u64);
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub struct ParticipantIndex(pub u32);
39
40#[derive(Default, Debug)]
41pub struct User {
42 pub id: UserId,
43 pub github_login: String,
44 pub avatar_uri: SharedUri,
45 pub name: Option<String>,
46 pub email: Option<String>,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct Collaborator {
51 pub peer_id: proto::PeerId,
52 pub replica_id: ReplicaId,
53 pub user_id: UserId,
54 pub is_host: bool,
55}
56
57impl PartialOrd for User {
58 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
59 Some(self.cmp(other))
60 }
61}
62
63impl Ord for User {
64 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
65 self.github_login.cmp(&other.github_login)
66 }
67}
68
69impl PartialEq for User {
70 fn eq(&self, other: &Self) -> bool {
71 self.id == other.id && self.github_login == other.github_login
72 }
73}
74
75impl Eq for User {}
76
77#[derive(Debug, PartialEq)]
78pub struct Contact {
79 pub user: Arc<User>,
80 pub online: bool,
81 pub busy: bool,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum ContactRequestStatus {
86 None,
87 RequestSent,
88 RequestReceived,
89 RequestAccepted,
90}
91
92pub struct UserStore {
93 users: HashMap<u64, Arc<User>>,
94 by_github_login: HashMap<String, u64>,
95 participant_indices: HashMap<u64, ParticipantIndex>,
96 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
97 current_plan: Option<proto::Plan>,
98 current_user: watch::Receiver<Option<Arc<User>>>,
99 accepted_tos_at: Option<Option<DateTime<Utc>>>,
100 contacts: Vec<Arc<Contact>>,
101 incoming_contact_requests: Vec<Arc<User>>,
102 outgoing_contact_requests: Vec<Arc<User>>,
103 pending_contact_requests: HashMap<u64, usize>,
104 invite_info: Option<InviteInfo>,
105 client: Weak<Client>,
106 _maintain_contacts: Task<()>,
107 _maintain_current_user: Task<Result<()>>,
108 weak_self: WeakEntity<Self>,
109}
110
111#[derive(Clone)]
112pub struct InviteInfo {
113 pub count: u32,
114 pub url: Arc<str>,
115}
116
117pub enum Event {
118 Contact {
119 user: Arc<User>,
120 kind: ContactEventKind,
121 },
122 ShowContacts,
123 ParticipantIndicesChanged,
124 PrivateUserInfoUpdated,
125}
126
127#[derive(Clone, Copy)]
128pub enum ContactEventKind {
129 Requested,
130 Accepted,
131 Cancelled,
132}
133
134impl EventEmitter<Event> for UserStore {}
135
136enum UpdateContacts {
137 Update(proto::UpdateContacts),
138 Wait(postage::barrier::Sender),
139 Clear(postage::barrier::Sender),
140}
141
142impl UserStore {
143 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
144 let (mut current_user_tx, current_user_rx) = watch::channel();
145 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
146 let rpc_subscriptions = vec![
147 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
148 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
149 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
150 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
151 ];
152 Self {
153 users: Default::default(),
154 by_github_login: Default::default(),
155 current_user: current_user_rx,
156 current_plan: None,
157 accepted_tos_at: None,
158 contacts: Default::default(),
159 incoming_contact_requests: Default::default(),
160 participant_indices: Default::default(),
161 outgoing_contact_requests: Default::default(),
162 invite_info: None,
163 client: Arc::downgrade(&client),
164 update_contacts_tx,
165 _maintain_contacts: cx.spawn(|this, mut cx| async move {
166 let _subscriptions = rpc_subscriptions;
167 while let Some(message) = update_contacts_rx.next().await {
168 if let Ok(task) =
169 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
170 {
171 task.log_err().await;
172 } else {
173 break;
174 }
175 }
176 }),
177 _maintain_current_user: cx.spawn(|this, mut cx| async move {
178 let mut status = client.status();
179 let weak = Arc::downgrade(&client);
180 drop(client);
181 while let Some(status) = status.next().await {
182 // if the client is dropped, the app is shutting down.
183 let Some(client) = weak.upgrade() else {
184 return Ok(());
185 };
186 match status {
187 Status::Connected { .. } => {
188 if let Some(user_id) = client.user_id() {
189 let fetch_user = if let Ok(fetch_user) = this
190 .update(&mut cx, |this, cx| {
191 this.get_user(user_id, cx).log_err()
192 }) {
193 fetch_user
194 } else {
195 break;
196 };
197 let fetch_private_user_info =
198 client.request(proto::GetPrivateUserInfo {}).log_err();
199 let (user, info) =
200 futures::join!(fetch_user, fetch_private_user_info);
201
202 cx.update(|cx| {
203 if let Some(info) = info {
204 let staff =
205 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
206 cx.update_flags(staff, info.flags);
207 client.telemetry.set_authenticated_user_info(
208 Some(info.metrics_id.clone()),
209 staff,
210 );
211
212 this.update(cx, |this, cx| {
213 let accepted_tos_at = {
214 #[cfg(debug_assertions)]
215 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
216 {
217 None
218 } else {
219 info.accepted_tos_at
220 }
221
222 #[cfg(not(debug_assertions))]
223 info.accepted_tos_at
224 };
225
226 this.set_current_user_accepted_tos_at(accepted_tos_at);
227 cx.emit(Event::PrivateUserInfoUpdated);
228 })
229 } else {
230 anyhow::Ok(())
231 }
232 })??;
233
234 current_user_tx.send(user).await.ok();
235
236 this.update(&mut cx, |_, cx| cx.notify())?;
237 }
238 }
239 Status::SignedOut => {
240 current_user_tx.send(None).await.ok();
241 this.update(&mut cx, |this, cx| {
242 this.accepted_tos_at = None;
243 cx.emit(Event::PrivateUserInfoUpdated);
244 cx.notify();
245 this.clear_contacts()
246 })?
247 .await;
248 }
249 Status::ConnectionLost => {
250 this.update(&mut cx, |this, cx| {
251 cx.notify();
252 this.clear_contacts()
253 })?
254 .await;
255 }
256 _ => {}
257 }
258 }
259 Ok(())
260 }),
261 pending_contact_requests: Default::default(),
262 weak_self: cx.weak_entity(),
263 }
264 }
265
266 #[cfg(feature = "test-support")]
267 pub fn clear_cache(&mut self) {
268 self.users.clear();
269 self.by_github_login.clear();
270 }
271
272 async fn handle_update_invite_info(
273 this: Entity<Self>,
274 message: TypedEnvelope<proto::UpdateInviteInfo>,
275 mut cx: AsyncApp,
276 ) -> Result<()> {
277 this.update(&mut cx, |this, cx| {
278 this.invite_info = Some(InviteInfo {
279 url: Arc::from(message.payload.url),
280 count: message.payload.count,
281 });
282 cx.notify();
283 })?;
284 Ok(())
285 }
286
287 async fn handle_show_contacts(
288 this: Entity<Self>,
289 _: TypedEnvelope<proto::ShowContacts>,
290 mut cx: AsyncApp,
291 ) -> Result<()> {
292 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
293 Ok(())
294 }
295
296 pub fn invite_info(&self) -> Option<&InviteInfo> {
297 self.invite_info.as_ref()
298 }
299
300 async fn handle_update_contacts(
301 this: Entity<Self>,
302 message: TypedEnvelope<proto::UpdateContacts>,
303 mut cx: AsyncApp,
304 ) -> Result<()> {
305 this.update(&mut cx, |this, _| {
306 this.update_contacts_tx
307 .unbounded_send(UpdateContacts::Update(message.payload))
308 .unwrap();
309 })?;
310 Ok(())
311 }
312
313 async fn handle_update_plan(
314 this: Entity<Self>,
315 message: TypedEnvelope<proto::UpdateUserPlan>,
316 mut cx: AsyncApp,
317 ) -> Result<()> {
318 this.update(&mut cx, |this, cx| {
319 this.current_plan = Some(message.payload.plan());
320 cx.notify();
321 })?;
322 Ok(())
323 }
324
325 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
326 match message {
327 UpdateContacts::Wait(barrier) => {
328 drop(barrier);
329 Task::ready(Ok(()))
330 }
331 UpdateContacts::Clear(barrier) => {
332 self.contacts.clear();
333 self.incoming_contact_requests.clear();
334 self.outgoing_contact_requests.clear();
335 drop(barrier);
336 Task::ready(Ok(()))
337 }
338 UpdateContacts::Update(message) => {
339 let mut user_ids = HashSet::default();
340 for contact in &message.contacts {
341 user_ids.insert(contact.user_id);
342 }
343 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
344 user_ids.extend(message.outgoing_requests.iter());
345
346 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
347 cx.spawn(|this, mut cx| async move {
348 load_users.await?;
349
350 // Users are fetched in parallel above and cached in call to get_users
351 // No need to parallelize here
352 let mut updated_contacts = Vec::new();
353 let this = this
354 .upgrade()
355 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
356 for contact in message.contacts {
357 updated_contacts.push(Arc::new(
358 Contact::from_proto(contact, &this, &mut cx).await?,
359 ));
360 }
361
362 let mut incoming_requests = Vec::new();
363 for request in message.incoming_requests {
364 incoming_requests.push({
365 this.update(&mut cx, |this, cx| {
366 this.get_user(request.requester_id, cx)
367 })?
368 .await?
369 });
370 }
371
372 let mut outgoing_requests = Vec::new();
373 for requested_user_id in message.outgoing_requests {
374 outgoing_requests.push(
375 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
376 .await?,
377 );
378 }
379
380 let removed_contacts =
381 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
382 let removed_incoming_requests =
383 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
384 let removed_outgoing_requests =
385 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
386
387 this.update(&mut cx, |this, cx| {
388 // Remove contacts
389 this.contacts
390 .retain(|contact| !removed_contacts.contains(&contact.user.id));
391 // Update existing contacts and insert new ones
392 for updated_contact in updated_contacts {
393 match this.contacts.binary_search_by_key(
394 &&updated_contact.user.github_login,
395 |contact| &contact.user.github_login,
396 ) {
397 Ok(ix) => this.contacts[ix] = updated_contact,
398 Err(ix) => this.contacts.insert(ix, updated_contact),
399 }
400 }
401
402 // Remove incoming contact requests
403 this.incoming_contact_requests.retain(|user| {
404 if removed_incoming_requests.contains(&user.id) {
405 cx.emit(Event::Contact {
406 user: user.clone(),
407 kind: ContactEventKind::Cancelled,
408 });
409 false
410 } else {
411 true
412 }
413 });
414 // Update existing incoming requests and insert new ones
415 for user in incoming_requests {
416 match this
417 .incoming_contact_requests
418 .binary_search_by_key(&&user.github_login, |contact| {
419 &contact.github_login
420 }) {
421 Ok(ix) => this.incoming_contact_requests[ix] = user,
422 Err(ix) => this.incoming_contact_requests.insert(ix, user),
423 }
424 }
425
426 // Remove outgoing contact requests
427 this.outgoing_contact_requests
428 .retain(|user| !removed_outgoing_requests.contains(&user.id));
429 // Update existing incoming requests and insert new ones
430 for request in outgoing_requests {
431 match this
432 .outgoing_contact_requests
433 .binary_search_by_key(&&request.github_login, |contact| {
434 &contact.github_login
435 }) {
436 Ok(ix) => this.outgoing_contact_requests[ix] = request,
437 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
438 }
439 }
440
441 cx.notify();
442 })?;
443
444 Ok(())
445 })
446 }
447 }
448 }
449
450 pub fn contacts(&self) -> &[Arc<Contact>] {
451 &self.contacts
452 }
453
454 pub fn has_contact(&self, user: &Arc<User>) -> bool {
455 self.contacts
456 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
457 .is_ok()
458 }
459
460 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
461 &self.incoming_contact_requests
462 }
463
464 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
465 &self.outgoing_contact_requests
466 }
467
468 pub fn is_contact_request_pending(&self, user: &User) -> bool {
469 self.pending_contact_requests.contains_key(&user.id)
470 }
471
472 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
473 if self
474 .contacts
475 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
476 .is_ok()
477 {
478 ContactRequestStatus::RequestAccepted
479 } else if self
480 .outgoing_contact_requests
481 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
482 .is_ok()
483 {
484 ContactRequestStatus::RequestSent
485 } else if self
486 .incoming_contact_requests
487 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
488 .is_ok()
489 {
490 ContactRequestStatus::RequestReceived
491 } else {
492 ContactRequestStatus::None
493 }
494 }
495
496 pub fn request_contact(
497 &mut self,
498 responder_id: u64,
499 cx: &mut Context<Self>,
500 ) -> Task<Result<()>> {
501 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
502 }
503
504 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
505 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
506 }
507
508 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
509 self.incoming_contact_requests
510 .iter()
511 .any(|user| user.id == user_id)
512 }
513
514 pub fn respond_to_contact_request(
515 &mut self,
516 requester_id: u64,
517 accept: bool,
518 cx: &mut Context<Self>,
519 ) -> Task<Result<()>> {
520 self.perform_contact_request(
521 requester_id,
522 proto::RespondToContactRequest {
523 requester_id,
524 response: if accept {
525 proto::ContactRequestResponse::Accept
526 } else {
527 proto::ContactRequestResponse::Decline
528 } as i32,
529 },
530 cx,
531 )
532 }
533
534 pub fn dismiss_contact_request(
535 &self,
536 requester_id: u64,
537 cx: &Context<Self>,
538 ) -> Task<Result<()>> {
539 let client = self.client.upgrade();
540 cx.spawn(move |_, _| async move {
541 client
542 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
543 .request(proto::RespondToContactRequest {
544 requester_id,
545 response: proto::ContactRequestResponse::Dismiss as i32,
546 })
547 .await?;
548 Ok(())
549 })
550 }
551
552 fn perform_contact_request<T: RequestMessage>(
553 &mut self,
554 user_id: u64,
555 request: T,
556 cx: &mut Context<Self>,
557 ) -> Task<Result<()>> {
558 let client = self.client.upgrade();
559 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
560 cx.notify();
561
562 cx.spawn(move |this, mut cx| async move {
563 let response = client
564 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
565 .request(request)
566 .await;
567 this.update(&mut cx, |this, cx| {
568 if let Entry::Occupied(mut request_count) =
569 this.pending_contact_requests.entry(user_id)
570 {
571 *request_count.get_mut() -= 1;
572 if *request_count.get() == 0 {
573 request_count.remove();
574 }
575 }
576 cx.notify();
577 })?;
578 response?;
579 Ok(())
580 })
581 }
582
583 pub fn clear_contacts(&self) -> impl Future<Output = ()> {
584 let (tx, mut rx) = postage::barrier::channel();
585 self.update_contacts_tx
586 .unbounded_send(UpdateContacts::Clear(tx))
587 .unwrap();
588 async move {
589 rx.next().await;
590 }
591 }
592
593 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
594 let (tx, mut rx) = postage::barrier::channel();
595 self.update_contacts_tx
596 .unbounded_send(UpdateContacts::Wait(tx))
597 .unwrap();
598 async move {
599 rx.next().await;
600 }
601 }
602
603 pub fn get_users(
604 &self,
605 user_ids: Vec<u64>,
606 cx: &Context<Self>,
607 ) -> Task<Result<Vec<Arc<User>>>> {
608 let mut user_ids_to_fetch = user_ids.clone();
609 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
610
611 cx.spawn(|this, mut cx| async move {
612 if !user_ids_to_fetch.is_empty() {
613 this.update(&mut cx, |this, cx| {
614 this.load_users(
615 proto::GetUsers {
616 user_ids: user_ids_to_fetch,
617 },
618 cx,
619 )
620 })?
621 .await?;
622 }
623
624 this.update(&mut cx, |this, _| {
625 user_ids
626 .iter()
627 .map(|user_id| {
628 this.users
629 .get(user_id)
630 .cloned()
631 .ok_or_else(|| anyhow!("user {} not found", user_id))
632 })
633 .collect()
634 })?
635 })
636 }
637
638 pub fn fuzzy_search_users(
639 &self,
640 query: String,
641 cx: &Context<Self>,
642 ) -> Task<Result<Vec<Arc<User>>>> {
643 self.load_users(proto::FuzzySearchUsers { query }, cx)
644 }
645
646 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
647 self.users.get(&user_id).cloned()
648 }
649
650 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
651 if let Some(user) = self.users.get(&user_id).cloned() {
652 return Some(user);
653 }
654
655 self.get_user(user_id, cx).detach_and_log_err(cx);
656 None
657 }
658
659 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
660 if let Some(user) = self.users.get(&user_id).cloned() {
661 return Task::ready(Ok(user));
662 }
663
664 let load_users = self.get_users(vec![user_id], cx);
665 cx.spawn(move |this, mut cx| async move {
666 load_users.await?;
667 this.update(&mut cx, |this, _| {
668 this.users
669 .get(&user_id)
670 .cloned()
671 .ok_or_else(|| anyhow!("server responded with no users"))
672 })?
673 })
674 }
675
676 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
677 self.by_github_login
678 .get(github_login)
679 .and_then(|id| self.users.get(id).cloned())
680 }
681
682 pub fn current_user(&self) -> Option<Arc<User>> {
683 self.current_user.borrow().clone()
684 }
685
686 pub fn current_plan(&self) -> Option<proto::Plan> {
687 self.current_plan
688 }
689
690 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
691 self.current_user.clone()
692 }
693
694 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
695 self.accepted_tos_at
696 .map(|accepted_tos_at| accepted_tos_at.is_some())
697 }
698
699 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
700 if self.current_user().is_none() {
701 return Task::ready(Err(anyhow!("no current user")));
702 };
703
704 let client = self.client.clone();
705 cx.spawn(move |this, mut cx| async move {
706 if let Some(client) = client.upgrade() {
707 let response = client
708 .request(proto::AcceptTermsOfService {})
709 .await
710 .context("error accepting tos")?;
711
712 this.update(&mut cx, |this, cx| {
713 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
714 cx.emit(Event::PrivateUserInfoUpdated);
715 })
716 } else {
717 Err(anyhow!("client not found"))
718 }
719 })
720 }
721
722 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
723 self.accepted_tos_at = Some(
724 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
725 );
726 }
727
728 fn load_users(
729 &self,
730 request: impl RequestMessage<Response = UsersResponse>,
731 cx: &Context<Self>,
732 ) -> Task<Result<Vec<Arc<User>>>> {
733 let client = self.client.clone();
734 cx.spawn(|this, mut cx| async move {
735 if let Some(rpc) = client.upgrade() {
736 let response = rpc.request(request).await.context("error loading users")?;
737 let users = response.users;
738
739 this.update(&mut cx, |this, _| this.insert(users))
740 } else {
741 Ok(Vec::new())
742 }
743 })
744 }
745
746 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
747 let mut ret = Vec::with_capacity(users.len());
748 for user in users {
749 let user = User::new(user);
750 if let Some(old) = self.users.insert(user.id, user.clone()) {
751 if old.github_login != user.github_login {
752 self.by_github_login.remove(&old.github_login);
753 }
754 }
755 self.by_github_login
756 .insert(user.github_login.clone(), user.id);
757 ret.push(user)
758 }
759 ret
760 }
761
762 pub fn set_participant_indices(
763 &mut self,
764 participant_indices: HashMap<u64, ParticipantIndex>,
765 cx: &mut Context<Self>,
766 ) {
767 if participant_indices != self.participant_indices {
768 self.participant_indices = participant_indices;
769 cx.emit(Event::ParticipantIndicesChanged);
770 }
771 }
772
773 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
774 &self.participant_indices
775 }
776
777 pub fn participant_names(
778 &self,
779 user_ids: impl Iterator<Item = u64>,
780 cx: &App,
781 ) -> HashMap<u64, SharedString> {
782 let mut ret = HashMap::default();
783 let mut missing_user_ids = Vec::new();
784 for id in user_ids {
785 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
786 ret.insert(id, github_login.into());
787 } else {
788 missing_user_ids.push(id)
789 }
790 }
791 if !missing_user_ids.is_empty() {
792 let this = self.weak_self.clone();
793 cx.spawn(|mut cx| async move {
794 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
795 .await
796 })
797 .detach_and_log_err(cx);
798 }
799 ret
800 }
801}
802
803impl User {
804 fn new(message: proto::User) -> Arc<Self> {
805 Arc::new(User {
806 id: message.id,
807 github_login: message.github_login,
808 avatar_uri: message.avatar_url.into(),
809 name: message.name,
810 email: message.email,
811 })
812 }
813}
814
815impl Contact {
816 async fn from_proto(
817 contact: proto::Contact,
818 user_store: &Entity<UserStore>,
819 cx: &mut AsyncApp,
820 ) -> Result<Self> {
821 let user = user_store
822 .update(cx, |user_store, cx| {
823 user_store.get_user(contact.user_id, cx)
824 })?
825 .await?;
826 Ok(Self {
827 user,
828 online: contact.online,
829 busy: contact.busy,
830 })
831 }
832}
833
834impl Collaborator {
835 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
836 Ok(Self {
837 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
838 replica_id: message.replica_id as ReplicaId,
839 user_id: message.user_id as UserId,
840 is_host: message.is_host,
841 })
842 }
843}