1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context as _, Result};
3use chrono::{DateTime, Utc};
4use collections::{hash_map::Entry, HashMap, HashSet};
5use feature_flags::FeatureFlagAppExt;
6use futures::{channel::mpsc, Future, StreamExt};
7use gpui::{
8 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(
19 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
20)]
21pub struct ChannelId(pub u64);
22
23impl std::fmt::Display for ChannelId {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 self.0.fmt(f)
26 }
27}
28
29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
30pub struct ProjectId(pub u64);
31
32#[derive(
33 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
34)]
35pub struct DevServerProjectId(pub u64);
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub struct ParticipantIndex(pub u32);
39
40#[derive(Default, Debug)]
41pub struct User {
42 pub id: UserId,
43 pub github_login: String,
44 pub avatar_uri: SharedUri,
45 pub name: Option<String>,
46 pub email: Option<String>,
47}
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct Collaborator {
51 pub peer_id: proto::PeerId,
52 pub replica_id: ReplicaId,
53 pub user_id: UserId,
54 pub is_host: bool,
55}
56
57impl PartialOrd for User {
58 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
59 Some(self.cmp(other))
60 }
61}
62
63impl Ord for User {
64 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
65 self.github_login.cmp(&other.github_login)
66 }
67}
68
69impl PartialEq for User {
70 fn eq(&self, other: &Self) -> bool {
71 self.id == other.id && self.github_login == other.github_login
72 }
73}
74
75impl Eq for User {}
76
77#[derive(Debug, PartialEq)]
78pub struct Contact {
79 pub user: Arc<User>,
80 pub online: bool,
81 pub busy: bool,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum ContactRequestStatus {
86 None,
87 RequestSent,
88 RequestReceived,
89 RequestAccepted,
90}
91
92pub struct UserStore {
93 users: HashMap<u64, Arc<User>>,
94 by_github_login: HashMap<String, u64>,
95 participant_indices: HashMap<u64, ParticipantIndex>,
96 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
97 current_plan: Option<proto::Plan>,
98 current_user: watch::Receiver<Option<Arc<User>>>,
99 accepted_tos_at: Option<Option<DateTime<Utc>>>,
100 contacts: Vec<Arc<Contact>>,
101 incoming_contact_requests: Vec<Arc<User>>,
102 outgoing_contact_requests: Vec<Arc<User>>,
103 pending_contact_requests: HashMap<u64, usize>,
104 invite_info: Option<InviteInfo>,
105 client: Weak<Client>,
106 _maintain_contacts: Task<()>,
107 _maintain_current_user: Task<Result<()>>,
108 weak_self: WeakEntity<Self>,
109}
110
111#[derive(Clone)]
112pub struct InviteInfo {
113 pub count: u32,
114 pub url: Arc<str>,
115}
116
117pub enum Event {
118 Contact {
119 user: Arc<User>,
120 kind: ContactEventKind,
121 },
122 ShowContacts,
123 ParticipantIndicesChanged,
124 TermsStatusUpdated {
125 accepted: bool,
126 },
127}
128
129#[derive(Clone, Copy)]
130pub enum ContactEventKind {
131 Requested,
132 Accepted,
133 Cancelled,
134}
135
136impl EventEmitter<Event> for UserStore {}
137
138enum UpdateContacts {
139 Update(proto::UpdateContacts),
140 Wait(postage::barrier::Sender),
141 Clear(postage::barrier::Sender),
142}
143
144impl UserStore {
145 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
146 let (mut current_user_tx, current_user_rx) = watch::channel();
147 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
148 let rpc_subscriptions = vec![
149 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
150 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
151 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
152 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
153 ];
154 Self {
155 users: Default::default(),
156 by_github_login: Default::default(),
157 current_user: current_user_rx,
158 current_plan: None,
159 accepted_tos_at: None,
160 contacts: Default::default(),
161 incoming_contact_requests: Default::default(),
162 participant_indices: Default::default(),
163 outgoing_contact_requests: Default::default(),
164 invite_info: None,
165 client: Arc::downgrade(&client),
166 update_contacts_tx,
167 _maintain_contacts: cx.spawn(|this, mut cx| async move {
168 let _subscriptions = rpc_subscriptions;
169 while let Some(message) = update_contacts_rx.next().await {
170 if let Ok(task) =
171 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
172 {
173 task.log_err().await;
174 } else {
175 break;
176 }
177 }
178 }),
179 _maintain_current_user: cx.spawn(|this, mut cx| async move {
180 let mut status = client.status();
181 let weak = Arc::downgrade(&client);
182 drop(client);
183 while let Some(status) = status.next().await {
184 // if the client is dropped, the app is shutting down.
185 let Some(client) = weak.upgrade() else {
186 return Ok(());
187 };
188 match status {
189 Status::Connected { .. } => {
190 if let Some(user_id) = client.user_id() {
191 let fetch_user = if let Ok(fetch_user) = this
192 .update(&mut cx, |this, cx| {
193 this.get_user(user_id, cx).log_err()
194 }) {
195 fetch_user
196 } else {
197 break;
198 };
199 let fetch_private_user_info =
200 client.request(proto::GetPrivateUserInfo {}).log_err();
201 let (user, info) =
202 futures::join!(fetch_user, fetch_private_user_info);
203
204 cx.update(|cx| {
205 if let Some(info) = info {
206 let disable_staff = std::env::var("ZED_DISABLE_STAFF")
207 .map_or(false, |v| !v.is_empty() && v != "0");
208 let staff = info.staff && !disable_staff;
209 cx.update_flags(staff, info.flags);
210 client.telemetry.set_authenticated_user_info(
211 Some(info.metrics_id.clone()),
212 staff,
213 );
214
215 this.update(cx, |this, cx| {
216 let accepted_tos_at = {
217 #[cfg(debug_assertions)]
218 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
219 {
220 None
221 } else {
222 info.accepted_tos_at
223 }
224
225 #[cfg(not(debug_assertions))]
226 info.accepted_tos_at
227 };
228
229 this.set_current_user_accepted_tos_at(accepted_tos_at);
230 cx.emit(Event::TermsStatusUpdated {
231 accepted: accepted_tos_at.is_some(),
232 });
233 })
234 } else {
235 anyhow::Ok(())
236 }
237 })??;
238
239 current_user_tx.send(user).await.ok();
240
241 this.update(&mut cx, |_, cx| cx.notify())?;
242 }
243 }
244 Status::SignedOut => {
245 current_user_tx.send(None).await.ok();
246 this.update(&mut cx, |this, cx| {
247 cx.notify();
248 this.clear_contacts()
249 })?
250 .await;
251 }
252 Status::ConnectionLost => {
253 this.update(&mut cx, |this, cx| {
254 cx.notify();
255 this.clear_contacts()
256 })?
257 .await;
258 }
259 _ => {}
260 }
261 }
262 Ok(())
263 }),
264 pending_contact_requests: Default::default(),
265 weak_self: cx.weak_entity(),
266 }
267 }
268
269 #[cfg(feature = "test-support")]
270 pub fn clear_cache(&mut self) {
271 self.users.clear();
272 self.by_github_login.clear();
273 }
274
275 async fn handle_update_invite_info(
276 this: Entity<Self>,
277 message: TypedEnvelope<proto::UpdateInviteInfo>,
278 mut cx: AsyncApp,
279 ) -> Result<()> {
280 this.update(&mut cx, |this, cx| {
281 this.invite_info = Some(InviteInfo {
282 url: Arc::from(message.payload.url),
283 count: message.payload.count,
284 });
285 cx.notify();
286 })?;
287 Ok(())
288 }
289
290 async fn handle_show_contacts(
291 this: Entity<Self>,
292 _: TypedEnvelope<proto::ShowContacts>,
293 mut cx: AsyncApp,
294 ) -> Result<()> {
295 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
296 Ok(())
297 }
298
299 pub fn invite_info(&self) -> Option<&InviteInfo> {
300 self.invite_info.as_ref()
301 }
302
303 async fn handle_update_contacts(
304 this: Entity<Self>,
305 message: TypedEnvelope<proto::UpdateContacts>,
306 mut cx: AsyncApp,
307 ) -> Result<()> {
308 this.update(&mut cx, |this, _| {
309 this.update_contacts_tx
310 .unbounded_send(UpdateContacts::Update(message.payload))
311 .unwrap();
312 })?;
313 Ok(())
314 }
315
316 async fn handle_update_plan(
317 this: Entity<Self>,
318 message: TypedEnvelope<proto::UpdateUserPlan>,
319 mut cx: AsyncApp,
320 ) -> Result<()> {
321 this.update(&mut cx, |this, cx| {
322 this.current_plan = Some(message.payload.plan());
323 cx.notify();
324 })?;
325 Ok(())
326 }
327
328 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
329 match message {
330 UpdateContacts::Wait(barrier) => {
331 drop(barrier);
332 Task::ready(Ok(()))
333 }
334 UpdateContacts::Clear(barrier) => {
335 self.contacts.clear();
336 self.incoming_contact_requests.clear();
337 self.outgoing_contact_requests.clear();
338 drop(barrier);
339 Task::ready(Ok(()))
340 }
341 UpdateContacts::Update(message) => {
342 let mut user_ids = HashSet::default();
343 for contact in &message.contacts {
344 user_ids.insert(contact.user_id);
345 }
346 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
347 user_ids.extend(message.outgoing_requests.iter());
348
349 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
350 cx.spawn(|this, mut cx| async move {
351 load_users.await?;
352
353 // Users are fetched in parallel above and cached in call to get_users
354 // No need to parallelize here
355 let mut updated_contacts = Vec::new();
356 let this = this
357 .upgrade()
358 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
359 for contact in message.contacts {
360 updated_contacts.push(Arc::new(
361 Contact::from_proto(contact, &this, &mut cx).await?,
362 ));
363 }
364
365 let mut incoming_requests = Vec::new();
366 for request in message.incoming_requests {
367 incoming_requests.push({
368 this.update(&mut cx, |this, cx| {
369 this.get_user(request.requester_id, cx)
370 })?
371 .await?
372 });
373 }
374
375 let mut outgoing_requests = Vec::new();
376 for requested_user_id in message.outgoing_requests {
377 outgoing_requests.push(
378 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
379 .await?,
380 );
381 }
382
383 let removed_contacts =
384 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
385 let removed_incoming_requests =
386 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
387 let removed_outgoing_requests =
388 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
389
390 this.update(&mut cx, |this, cx| {
391 // Remove contacts
392 this.contacts
393 .retain(|contact| !removed_contacts.contains(&contact.user.id));
394 // Update existing contacts and insert new ones
395 for updated_contact in updated_contacts {
396 match this.contacts.binary_search_by_key(
397 &&updated_contact.user.github_login,
398 |contact| &contact.user.github_login,
399 ) {
400 Ok(ix) => this.contacts[ix] = updated_contact,
401 Err(ix) => this.contacts.insert(ix, updated_contact),
402 }
403 }
404
405 // Remove incoming contact requests
406 this.incoming_contact_requests.retain(|user| {
407 if removed_incoming_requests.contains(&user.id) {
408 cx.emit(Event::Contact {
409 user: user.clone(),
410 kind: ContactEventKind::Cancelled,
411 });
412 false
413 } else {
414 true
415 }
416 });
417 // Update existing incoming requests and insert new ones
418 for user in incoming_requests {
419 match this
420 .incoming_contact_requests
421 .binary_search_by_key(&&user.github_login, |contact| {
422 &contact.github_login
423 }) {
424 Ok(ix) => this.incoming_contact_requests[ix] = user,
425 Err(ix) => this.incoming_contact_requests.insert(ix, user),
426 }
427 }
428
429 // Remove outgoing contact requests
430 this.outgoing_contact_requests
431 .retain(|user| !removed_outgoing_requests.contains(&user.id));
432 // Update existing incoming requests and insert new ones
433 for request in outgoing_requests {
434 match this
435 .outgoing_contact_requests
436 .binary_search_by_key(&&request.github_login, |contact| {
437 &contact.github_login
438 }) {
439 Ok(ix) => this.outgoing_contact_requests[ix] = request,
440 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
441 }
442 }
443
444 cx.notify();
445 })?;
446
447 Ok(())
448 })
449 }
450 }
451 }
452
453 pub fn contacts(&self) -> &[Arc<Contact>] {
454 &self.contacts
455 }
456
457 pub fn has_contact(&self, user: &Arc<User>) -> bool {
458 self.contacts
459 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
460 .is_ok()
461 }
462
463 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
464 &self.incoming_contact_requests
465 }
466
467 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
468 &self.outgoing_contact_requests
469 }
470
471 pub fn is_contact_request_pending(&self, user: &User) -> bool {
472 self.pending_contact_requests.contains_key(&user.id)
473 }
474
475 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
476 if self
477 .contacts
478 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
479 .is_ok()
480 {
481 ContactRequestStatus::RequestAccepted
482 } else if self
483 .outgoing_contact_requests
484 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
485 .is_ok()
486 {
487 ContactRequestStatus::RequestSent
488 } else if self
489 .incoming_contact_requests
490 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
491 .is_ok()
492 {
493 ContactRequestStatus::RequestReceived
494 } else {
495 ContactRequestStatus::None
496 }
497 }
498
499 pub fn request_contact(
500 &mut self,
501 responder_id: u64,
502 cx: &mut Context<Self>,
503 ) -> Task<Result<()>> {
504 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
505 }
506
507 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
508 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
509 }
510
511 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
512 self.incoming_contact_requests
513 .iter()
514 .any(|user| user.id == user_id)
515 }
516
517 pub fn respond_to_contact_request(
518 &mut self,
519 requester_id: u64,
520 accept: bool,
521 cx: &mut Context<Self>,
522 ) -> Task<Result<()>> {
523 self.perform_contact_request(
524 requester_id,
525 proto::RespondToContactRequest {
526 requester_id,
527 response: if accept {
528 proto::ContactRequestResponse::Accept
529 } else {
530 proto::ContactRequestResponse::Decline
531 } as i32,
532 },
533 cx,
534 )
535 }
536
537 pub fn dismiss_contact_request(
538 &self,
539 requester_id: u64,
540 cx: &Context<Self>,
541 ) -> Task<Result<()>> {
542 let client = self.client.upgrade();
543 cx.spawn(move |_, _| async move {
544 client
545 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
546 .request(proto::RespondToContactRequest {
547 requester_id,
548 response: proto::ContactRequestResponse::Dismiss as i32,
549 })
550 .await?;
551 Ok(())
552 })
553 }
554
555 fn perform_contact_request<T: RequestMessage>(
556 &mut self,
557 user_id: u64,
558 request: T,
559 cx: &mut Context<Self>,
560 ) -> Task<Result<()>> {
561 let client = self.client.upgrade();
562 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
563 cx.notify();
564
565 cx.spawn(move |this, mut cx| async move {
566 let response = client
567 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
568 .request(request)
569 .await;
570 this.update(&mut cx, |this, cx| {
571 if let Entry::Occupied(mut request_count) =
572 this.pending_contact_requests.entry(user_id)
573 {
574 *request_count.get_mut() -= 1;
575 if *request_count.get() == 0 {
576 request_count.remove();
577 }
578 }
579 cx.notify();
580 })?;
581 response?;
582 Ok(())
583 })
584 }
585
586 pub fn clear_contacts(&self) -> impl Future<Output = ()> {
587 let (tx, mut rx) = postage::barrier::channel();
588 self.update_contacts_tx
589 .unbounded_send(UpdateContacts::Clear(tx))
590 .unwrap();
591 async move {
592 rx.next().await;
593 }
594 }
595
596 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
597 let (tx, mut rx) = postage::barrier::channel();
598 self.update_contacts_tx
599 .unbounded_send(UpdateContacts::Wait(tx))
600 .unwrap();
601 async move {
602 rx.next().await;
603 }
604 }
605
606 pub fn get_users(
607 &self,
608 user_ids: Vec<u64>,
609 cx: &Context<Self>,
610 ) -> Task<Result<Vec<Arc<User>>>> {
611 let mut user_ids_to_fetch = user_ids.clone();
612 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
613
614 cx.spawn(|this, mut cx| async move {
615 if !user_ids_to_fetch.is_empty() {
616 this.update(&mut cx, |this, cx| {
617 this.load_users(
618 proto::GetUsers {
619 user_ids: user_ids_to_fetch,
620 },
621 cx,
622 )
623 })?
624 .await?;
625 }
626
627 this.update(&mut cx, |this, _| {
628 user_ids
629 .iter()
630 .map(|user_id| {
631 this.users
632 .get(user_id)
633 .cloned()
634 .ok_or_else(|| anyhow!("user {} not found", user_id))
635 })
636 .collect()
637 })?
638 })
639 }
640
641 pub fn fuzzy_search_users(
642 &self,
643 query: String,
644 cx: &Context<Self>,
645 ) -> Task<Result<Vec<Arc<User>>>> {
646 self.load_users(proto::FuzzySearchUsers { query }, cx)
647 }
648
649 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
650 self.users.get(&user_id).cloned()
651 }
652
653 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
654 if let Some(user) = self.users.get(&user_id).cloned() {
655 return Some(user);
656 }
657
658 self.get_user(user_id, cx).detach_and_log_err(cx);
659 None
660 }
661
662 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
663 if let Some(user) = self.users.get(&user_id).cloned() {
664 return Task::ready(Ok(user));
665 }
666
667 let load_users = self.get_users(vec![user_id], cx);
668 cx.spawn(move |this, mut cx| async move {
669 load_users.await?;
670 this.update(&mut cx, |this, _| {
671 this.users
672 .get(&user_id)
673 .cloned()
674 .ok_or_else(|| anyhow!("server responded with no users"))
675 })?
676 })
677 }
678
679 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
680 self.by_github_login
681 .get(github_login)
682 .and_then(|id| self.users.get(id).cloned())
683 }
684
685 pub fn current_user(&self) -> Option<Arc<User>> {
686 self.current_user.borrow().clone()
687 }
688
689 pub fn current_plan(&self) -> Option<proto::Plan> {
690 self.current_plan
691 }
692
693 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
694 self.current_user.clone()
695 }
696
697 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
698 self.accepted_tos_at
699 .map(|accepted_tos_at| accepted_tos_at.is_some())
700 }
701
702 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
703 if self.current_user().is_none() {
704 return Task::ready(Err(anyhow!("no current user")));
705 };
706
707 let client = self.client.clone();
708 cx.spawn(move |this, mut cx| async move {
709 if let Some(client) = client.upgrade() {
710 let response = client
711 .request(proto::AcceptTermsOfService {})
712 .await
713 .context("error accepting tos")?;
714
715 this.update(&mut cx, |this, cx| {
716 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
717 cx.emit(Event::TermsStatusUpdated { accepted: true });
718 })
719 } else {
720 Err(anyhow!("client not found"))
721 }
722 })
723 }
724
725 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
726 self.accepted_tos_at = Some(
727 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
728 );
729 }
730
731 fn load_users(
732 &self,
733 request: impl RequestMessage<Response = UsersResponse>,
734 cx: &Context<Self>,
735 ) -> Task<Result<Vec<Arc<User>>>> {
736 let client = self.client.clone();
737 cx.spawn(|this, mut cx| async move {
738 if let Some(rpc) = client.upgrade() {
739 let response = rpc.request(request).await.context("error loading users")?;
740 let users = response.users;
741
742 this.update(&mut cx, |this, _| this.insert(users))
743 } else {
744 Ok(Vec::new())
745 }
746 })
747 }
748
749 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
750 let mut ret = Vec::with_capacity(users.len());
751 for user in users {
752 let user = User::new(user);
753 if let Some(old) = self.users.insert(user.id, user.clone()) {
754 if old.github_login != user.github_login {
755 self.by_github_login.remove(&old.github_login);
756 }
757 }
758 self.by_github_login
759 .insert(user.github_login.clone(), user.id);
760 ret.push(user)
761 }
762 ret
763 }
764
765 pub fn set_participant_indices(
766 &mut self,
767 participant_indices: HashMap<u64, ParticipantIndex>,
768 cx: &mut Context<Self>,
769 ) {
770 if participant_indices != self.participant_indices {
771 self.participant_indices = participant_indices;
772 cx.emit(Event::ParticipantIndicesChanged);
773 }
774 }
775
776 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
777 &self.participant_indices
778 }
779
780 pub fn participant_names(
781 &self,
782 user_ids: impl Iterator<Item = u64>,
783 cx: &App,
784 ) -> HashMap<u64, SharedString> {
785 let mut ret = HashMap::default();
786 let mut missing_user_ids = Vec::new();
787 for id in user_ids {
788 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
789 ret.insert(id, github_login.into());
790 } else {
791 missing_user_ids.push(id)
792 }
793 }
794 if !missing_user_ids.is_empty() {
795 let this = self.weak_self.clone();
796 cx.spawn(|mut cx| async move {
797 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
798 .await
799 })
800 .detach_and_log_err(cx);
801 }
802 ret
803 }
804}
805
806impl User {
807 fn new(message: proto::User) -> Arc<Self> {
808 Arc::new(User {
809 id: message.id,
810 github_login: message.github_login,
811 avatar_uri: message.avatar_url.into(),
812 name: message.name,
813 email: message.email,
814 })
815 }
816}
817
818impl Contact {
819 async fn from_proto(
820 contact: proto::Contact,
821 user_store: &Entity<UserStore>,
822 cx: &mut AsyncApp,
823 ) -> Result<Self> {
824 let user = user_store
825 .update(cx, |user_store, cx| {
826 user_store.get_user(contact.user_id, cx)
827 })?
828 .await?;
829 Ok(Self {
830 user,
831 online: contact.online,
832 busy: contact.busy,
833 })
834 }
835}
836
837impl Collaborator {
838 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
839 Ok(Self {
840 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
841 replica_id: message.replica_id as ReplicaId,
842 user_id: message.user_id as UserId,
843 is_host: message.is_host,
844 })
845 }
846}