1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use chrono::{DateTime, Utc};
4use collections::{hash_map::Entry, HashMap, HashSet};
5use feature_flags::FeatureFlagAppExt;
6use futures::{channel::mpsc, Future, StreamExt};
7use gpui::{
8 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
9 WeakModel,
10};
11use postage::{sink::Sink, watch};
12use rpc::proto::{RequestMessage, UsersResponse};
13use std::sync::{Arc, Weak};
14use text::ReplicaId;
15use util::TryFutureExt as _;
16
17pub type UserId = u64;
18
19#[derive(
20 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
21)]
22pub struct ChannelId(pub u64);
23
24impl std::fmt::Display for ChannelId {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 self.0.fmt(f)
27 }
28}
29
30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
31pub struct ProjectId(pub u64);
32
33#[derive(
34 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
35)]
36pub struct DevServerProjectId(pub u64);
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct ParticipantIndex(pub u32);
40
41#[derive(Default, Debug)]
42pub struct User {
43 pub id: UserId,
44 pub github_login: String,
45 pub avatar_uri: SharedUri,
46 pub name: Option<String>,
47 pub email: Option<String>,
48}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct Collaborator {
52 pub peer_id: proto::PeerId,
53 pub replica_id: ReplicaId,
54 pub user_id: UserId,
55 pub is_host: bool,
56}
57
58impl PartialOrd for User {
59 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
60 Some(self.cmp(other))
61 }
62}
63
64impl Ord for User {
65 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
66 self.github_login.cmp(&other.github_login)
67 }
68}
69
70impl PartialEq for User {
71 fn eq(&self, other: &Self) -> bool {
72 self.id == other.id && self.github_login == other.github_login
73 }
74}
75
76impl Eq for User {}
77
78#[derive(Debug, PartialEq)]
79pub struct Contact {
80 pub user: Arc<User>,
81 pub online: bool,
82 pub busy: bool,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum ContactRequestStatus {
87 None,
88 RequestSent,
89 RequestReceived,
90 RequestAccepted,
91}
92
93pub struct UserStore {
94 users: HashMap<u64, Arc<User>>,
95 by_github_login: HashMap<String, u64>,
96 participant_indices: HashMap<u64, ParticipantIndex>,
97 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
98 current_plan: Option<proto::Plan>,
99 current_user: watch::Receiver<Option<Arc<User>>>,
100 accepted_tos_at: Option<Option<DateTime<Utc>>>,
101 contacts: Vec<Arc<Contact>>,
102 incoming_contact_requests: Vec<Arc<User>>,
103 outgoing_contact_requests: Vec<Arc<User>>,
104 pending_contact_requests: HashMap<u64, usize>,
105 invite_info: Option<InviteInfo>,
106 client: Weak<Client>,
107 _maintain_contacts: Task<()>,
108 _maintain_current_user: Task<Result<()>>,
109 weak_self: WeakModel<Self>,
110}
111
112#[derive(Clone)]
113pub struct InviteInfo {
114 pub count: u32,
115 pub url: Arc<str>,
116}
117
118pub enum Event {
119 Contact {
120 user: Arc<User>,
121 kind: ContactEventKind,
122 },
123 ShowContacts,
124 ParticipantIndicesChanged,
125 TermsStatusUpdated {
126 accepted: bool,
127 },
128}
129
130#[derive(Clone, Copy)]
131pub enum ContactEventKind {
132 Requested,
133 Accepted,
134 Cancelled,
135}
136
137impl EventEmitter<Event> for UserStore {}
138
139enum UpdateContacts {
140 Update(proto::UpdateContacts),
141 Wait(postage::barrier::Sender),
142 Clear(postage::barrier::Sender),
143}
144
145impl UserStore {
146 pub fn new(client: Arc<Client>, cx: &ModelContext<Self>) -> Self {
147 let (mut current_user_tx, current_user_rx) = watch::channel();
148 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
149 let rpc_subscriptions = vec![
150 client.add_message_handler(cx.weak_model(), Self::handle_update_plan),
151 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
152 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
153 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
154 ];
155 Self {
156 users: Default::default(),
157 by_github_login: Default::default(),
158 current_user: current_user_rx,
159 current_plan: None,
160 accepted_tos_at: None,
161 contacts: Default::default(),
162 incoming_contact_requests: Default::default(),
163 participant_indices: Default::default(),
164 outgoing_contact_requests: Default::default(),
165 invite_info: None,
166 client: Arc::downgrade(&client),
167 update_contacts_tx,
168 _maintain_contacts: cx.spawn(|this, mut cx| async move {
169 let _subscriptions = rpc_subscriptions;
170 while let Some(message) = update_contacts_rx.next().await {
171 if let Ok(task) =
172 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
173 {
174 task.log_err().await;
175 } else {
176 break;
177 }
178 }
179 }),
180 _maintain_current_user: cx.spawn(|this, mut cx| async move {
181 let mut status = client.status();
182 let weak = Arc::downgrade(&client);
183 drop(client);
184 while let Some(status) = status.next().await {
185 // if the client is dropped, the app is shutting down.
186 let Some(client) = weak.upgrade() else {
187 return Ok(());
188 };
189 match status {
190 Status::Connected { .. } => {
191 if let Some(user_id) = client.user_id() {
192 let fetch_user = if let Ok(fetch_user) = this
193 .update(&mut cx, |this, cx| {
194 this.get_user(user_id, cx).log_err()
195 }) {
196 fetch_user
197 } else {
198 break;
199 };
200 let fetch_private_user_info =
201 client.request(proto::GetPrivateUserInfo {}).log_err();
202 let (user, info) =
203 futures::join!(fetch_user, fetch_private_user_info);
204
205 cx.update(|cx| {
206 if let Some(info) = info {
207 let disable_staff = std::env::var("ZED_DISABLE_STAFF")
208 .map_or(false, |v| !v.is_empty() && v != "0");
209 let staff = info.staff && !disable_staff;
210 cx.update_flags(staff, info.flags);
211 client.telemetry.set_authenticated_user_info(
212 Some(info.metrics_id.clone()),
213 staff,
214 );
215
216 this.update(cx, |this, cx| {
217 let accepted_tos_at = {
218 #[cfg(debug_assertions)]
219 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
220 {
221 None
222 } else {
223 info.accepted_tos_at
224 }
225
226 #[cfg(not(debug_assertions))]
227 info.accepted_tos_at
228 };
229
230 this.set_current_user_accepted_tos_at(accepted_tos_at);
231 cx.emit(Event::TermsStatusUpdated {
232 accepted: accepted_tos_at.is_some(),
233 });
234 })
235 } else {
236 anyhow::Ok(())
237 }
238 })??;
239
240 current_user_tx.send(user).await.ok();
241
242 this.update(&mut cx, |_, cx| cx.notify())?;
243 }
244 }
245 Status::SignedOut => {
246 current_user_tx.send(None).await.ok();
247 this.update(&mut cx, |this, cx| {
248 cx.notify();
249 this.clear_contacts()
250 })?
251 .await;
252 }
253 Status::ConnectionLost => {
254 this.update(&mut cx, |this, cx| {
255 cx.notify();
256 this.clear_contacts()
257 })?
258 .await;
259 }
260 _ => {}
261 }
262 }
263 Ok(())
264 }),
265 pending_contact_requests: Default::default(),
266 weak_self: cx.weak_model(),
267 }
268 }
269
270 #[cfg(feature = "test-support")]
271 pub fn clear_cache(&mut self) {
272 self.users.clear();
273 self.by_github_login.clear();
274 }
275
276 async fn handle_update_invite_info(
277 this: Model<Self>,
278 message: TypedEnvelope<proto::UpdateInviteInfo>,
279 mut cx: AsyncAppContext,
280 ) -> Result<()> {
281 this.update(&mut cx, |this, cx| {
282 this.invite_info = Some(InviteInfo {
283 url: Arc::from(message.payload.url),
284 count: message.payload.count,
285 });
286 cx.notify();
287 })?;
288 Ok(())
289 }
290
291 async fn handle_show_contacts(
292 this: Model<Self>,
293 _: TypedEnvelope<proto::ShowContacts>,
294 mut cx: AsyncAppContext,
295 ) -> Result<()> {
296 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
297 Ok(())
298 }
299
300 pub fn invite_info(&self) -> Option<&InviteInfo> {
301 self.invite_info.as_ref()
302 }
303
304 async fn handle_update_contacts(
305 this: Model<Self>,
306 message: TypedEnvelope<proto::UpdateContacts>,
307 mut cx: AsyncAppContext,
308 ) -> Result<()> {
309 this.update(&mut cx, |this, _| {
310 this.update_contacts_tx
311 .unbounded_send(UpdateContacts::Update(message.payload))
312 .unwrap();
313 })?;
314 Ok(())
315 }
316
317 async fn handle_update_plan(
318 this: Model<Self>,
319 message: TypedEnvelope<proto::UpdateUserPlan>,
320 mut cx: AsyncAppContext,
321 ) -> Result<()> {
322 this.update(&mut cx, |this, cx| {
323 this.current_plan = Some(message.payload.plan());
324 cx.notify();
325 })?;
326 Ok(())
327 }
328
329 fn update_contacts(
330 &mut self,
331 message: UpdateContacts,
332 cx: &ModelContext<Self>,
333 ) -> Task<Result<()>> {
334 match message {
335 UpdateContacts::Wait(barrier) => {
336 drop(barrier);
337 Task::ready(Ok(()))
338 }
339 UpdateContacts::Clear(barrier) => {
340 self.contacts.clear();
341 self.incoming_contact_requests.clear();
342 self.outgoing_contact_requests.clear();
343 drop(barrier);
344 Task::ready(Ok(()))
345 }
346 UpdateContacts::Update(message) => {
347 let mut user_ids = HashSet::default();
348 for contact in &message.contacts {
349 user_ids.insert(contact.user_id);
350 }
351 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
352 user_ids.extend(message.outgoing_requests.iter());
353
354 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
355 cx.spawn(|this, mut cx| async move {
356 load_users.await?;
357
358 // Users are fetched in parallel above and cached in call to get_users
359 // No need to parallelize here
360 let mut updated_contacts = Vec::new();
361 let this = this
362 .upgrade()
363 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
364 for contact in message.contacts {
365 updated_contacts.push(Arc::new(
366 Contact::from_proto(contact, &this, &mut cx).await?,
367 ));
368 }
369
370 let mut incoming_requests = Vec::new();
371 for request in message.incoming_requests {
372 incoming_requests.push({
373 this.update(&mut cx, |this, cx| {
374 this.get_user(request.requester_id, cx)
375 })?
376 .await?
377 });
378 }
379
380 let mut outgoing_requests = Vec::new();
381 for requested_user_id in message.outgoing_requests {
382 outgoing_requests.push(
383 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
384 .await?,
385 );
386 }
387
388 let removed_contacts =
389 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
390 let removed_incoming_requests =
391 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
392 let removed_outgoing_requests =
393 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
394
395 this.update(&mut cx, |this, cx| {
396 // Remove contacts
397 this.contacts
398 .retain(|contact| !removed_contacts.contains(&contact.user.id));
399 // Update existing contacts and insert new ones
400 for updated_contact in updated_contacts {
401 match this.contacts.binary_search_by_key(
402 &&updated_contact.user.github_login,
403 |contact| &contact.user.github_login,
404 ) {
405 Ok(ix) => this.contacts[ix] = updated_contact,
406 Err(ix) => this.contacts.insert(ix, updated_contact),
407 }
408 }
409
410 // Remove incoming contact requests
411 this.incoming_contact_requests.retain(|user| {
412 if removed_incoming_requests.contains(&user.id) {
413 cx.emit(Event::Contact {
414 user: user.clone(),
415 kind: ContactEventKind::Cancelled,
416 });
417 false
418 } else {
419 true
420 }
421 });
422 // Update existing incoming requests and insert new ones
423 for user in incoming_requests {
424 match this
425 .incoming_contact_requests
426 .binary_search_by_key(&&user.github_login, |contact| {
427 &contact.github_login
428 }) {
429 Ok(ix) => this.incoming_contact_requests[ix] = user,
430 Err(ix) => this.incoming_contact_requests.insert(ix, user),
431 }
432 }
433
434 // Remove outgoing contact requests
435 this.outgoing_contact_requests
436 .retain(|user| !removed_outgoing_requests.contains(&user.id));
437 // Update existing incoming requests and insert new ones
438 for request in outgoing_requests {
439 match this
440 .outgoing_contact_requests
441 .binary_search_by_key(&&request.github_login, |contact| {
442 &contact.github_login
443 }) {
444 Ok(ix) => this.outgoing_contact_requests[ix] = request,
445 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
446 }
447 }
448
449 cx.notify();
450 })?;
451
452 Ok(())
453 })
454 }
455 }
456 }
457
458 pub fn contacts(&self) -> &[Arc<Contact>] {
459 &self.contacts
460 }
461
462 pub fn has_contact(&self, user: &Arc<User>) -> bool {
463 self.contacts
464 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
465 .is_ok()
466 }
467
468 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
469 &self.incoming_contact_requests
470 }
471
472 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
473 &self.outgoing_contact_requests
474 }
475
476 pub fn is_contact_request_pending(&self, user: &User) -> bool {
477 self.pending_contact_requests.contains_key(&user.id)
478 }
479
480 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
481 if self
482 .contacts
483 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
484 .is_ok()
485 {
486 ContactRequestStatus::RequestAccepted
487 } else if self
488 .outgoing_contact_requests
489 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
490 .is_ok()
491 {
492 ContactRequestStatus::RequestSent
493 } else if self
494 .incoming_contact_requests
495 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
496 .is_ok()
497 {
498 ContactRequestStatus::RequestReceived
499 } else {
500 ContactRequestStatus::None
501 }
502 }
503
504 pub fn request_contact(
505 &mut self,
506 responder_id: u64,
507 cx: &mut ModelContext<Self>,
508 ) -> Task<Result<()>> {
509 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
510 }
511
512 pub fn remove_contact(
513 &mut self,
514 user_id: u64,
515 cx: &mut ModelContext<Self>,
516 ) -> Task<Result<()>> {
517 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
518 }
519
520 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
521 self.incoming_contact_requests
522 .iter()
523 .any(|user| user.id == user_id)
524 }
525
526 pub fn respond_to_contact_request(
527 &mut self,
528 requester_id: u64,
529 accept: bool,
530 cx: &mut ModelContext<Self>,
531 ) -> Task<Result<()>> {
532 self.perform_contact_request(
533 requester_id,
534 proto::RespondToContactRequest {
535 requester_id,
536 response: if accept {
537 proto::ContactRequestResponse::Accept
538 } else {
539 proto::ContactRequestResponse::Decline
540 } as i32,
541 },
542 cx,
543 )
544 }
545
546 pub fn dismiss_contact_request(
547 &self,
548 requester_id: u64,
549 cx: &ModelContext<Self>,
550 ) -> Task<Result<()>> {
551 let client = self.client.upgrade();
552 cx.spawn(move |_, _| async move {
553 client
554 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
555 .request(proto::RespondToContactRequest {
556 requester_id,
557 response: proto::ContactRequestResponse::Dismiss as i32,
558 })
559 .await?;
560 Ok(())
561 })
562 }
563
564 fn perform_contact_request<T: RequestMessage>(
565 &mut self,
566 user_id: u64,
567 request: T,
568 cx: &mut ModelContext<Self>,
569 ) -> Task<Result<()>> {
570 let client = self.client.upgrade();
571 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
572 cx.notify();
573
574 cx.spawn(move |this, mut cx| async move {
575 let response = client
576 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
577 .request(request)
578 .await;
579 this.update(&mut cx, |this, cx| {
580 if let Entry::Occupied(mut request_count) =
581 this.pending_contact_requests.entry(user_id)
582 {
583 *request_count.get_mut() -= 1;
584 if *request_count.get() == 0 {
585 request_count.remove();
586 }
587 }
588 cx.notify();
589 })?;
590 response?;
591 Ok(())
592 })
593 }
594
595 pub fn clear_contacts(&self) -> impl Future<Output = ()> {
596 let (tx, mut rx) = postage::barrier::channel();
597 self.update_contacts_tx
598 .unbounded_send(UpdateContacts::Clear(tx))
599 .unwrap();
600 async move {
601 rx.next().await;
602 }
603 }
604
605 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
606 let (tx, mut rx) = postage::barrier::channel();
607 self.update_contacts_tx
608 .unbounded_send(UpdateContacts::Wait(tx))
609 .unwrap();
610 async move {
611 rx.next().await;
612 }
613 }
614
615 pub fn get_users(
616 &self,
617 user_ids: Vec<u64>,
618 cx: &ModelContext<Self>,
619 ) -> Task<Result<Vec<Arc<User>>>> {
620 let mut user_ids_to_fetch = user_ids.clone();
621 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
622
623 cx.spawn(|this, mut cx| async move {
624 if !user_ids_to_fetch.is_empty() {
625 this.update(&mut cx, |this, cx| {
626 this.load_users(
627 proto::GetUsers {
628 user_ids: user_ids_to_fetch,
629 },
630 cx,
631 )
632 })?
633 .await?;
634 }
635
636 this.update(&mut cx, |this, _| {
637 user_ids
638 .iter()
639 .map(|user_id| {
640 this.users
641 .get(user_id)
642 .cloned()
643 .ok_or_else(|| anyhow!("user {} not found", user_id))
644 })
645 .collect()
646 })?
647 })
648 }
649
650 pub fn fuzzy_search_users(
651 &self,
652 query: String,
653 cx: &ModelContext<Self>,
654 ) -> Task<Result<Vec<Arc<User>>>> {
655 self.load_users(proto::FuzzySearchUsers { query }, cx)
656 }
657
658 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
659 self.users.get(&user_id).cloned()
660 }
661
662 pub fn get_user_optimistic(&self, user_id: u64, cx: &ModelContext<Self>) -> Option<Arc<User>> {
663 if let Some(user) = self.users.get(&user_id).cloned() {
664 return Some(user);
665 }
666
667 self.get_user(user_id, cx).detach_and_log_err(cx);
668 None
669 }
670
671 pub fn get_user(&self, user_id: u64, cx: &ModelContext<Self>) -> Task<Result<Arc<User>>> {
672 if let Some(user) = self.users.get(&user_id).cloned() {
673 return Task::ready(Ok(user));
674 }
675
676 let load_users = self.get_users(vec![user_id], cx);
677 cx.spawn(move |this, mut cx| async move {
678 load_users.await?;
679 this.update(&mut cx, |this, _| {
680 this.users
681 .get(&user_id)
682 .cloned()
683 .ok_or_else(|| anyhow!("server responded with no users"))
684 })?
685 })
686 }
687
688 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
689 self.by_github_login
690 .get(github_login)
691 .and_then(|id| self.users.get(id).cloned())
692 }
693
694 pub fn current_user(&self) -> Option<Arc<User>> {
695 self.current_user.borrow().clone()
696 }
697
698 pub fn current_plan(&self) -> Option<proto::Plan> {
699 self.current_plan
700 }
701
702 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
703 self.current_user.clone()
704 }
705
706 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
707 self.accepted_tos_at
708 .map(|accepted_tos_at| accepted_tos_at.is_some())
709 }
710
711 pub fn accept_terms_of_service(&self, cx: &ModelContext<Self>) -> Task<Result<()>> {
712 if self.current_user().is_none() {
713 return Task::ready(Err(anyhow!("no current user")));
714 };
715
716 let client = self.client.clone();
717 cx.spawn(move |this, mut cx| async move {
718 if let Some(client) = client.upgrade() {
719 let response = client
720 .request(proto::AcceptTermsOfService {})
721 .await
722 .context("error accepting tos")?;
723
724 this.update(&mut cx, |this, cx| {
725 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
726 cx.emit(Event::TermsStatusUpdated { accepted: true });
727 })
728 } else {
729 Err(anyhow!("client not found"))
730 }
731 })
732 }
733
734 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
735 self.accepted_tos_at = Some(
736 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
737 );
738 }
739
740 fn load_users(
741 &self,
742 request: impl RequestMessage<Response = UsersResponse>,
743 cx: &ModelContext<Self>,
744 ) -> Task<Result<Vec<Arc<User>>>> {
745 let client = self.client.clone();
746 cx.spawn(|this, mut cx| async move {
747 if let Some(rpc) = client.upgrade() {
748 let response = rpc.request(request).await.context("error loading users")?;
749 let users = response.users;
750
751 this.update(&mut cx, |this, _| this.insert(users))
752 } else {
753 Ok(Vec::new())
754 }
755 })
756 }
757
758 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
759 let mut ret = Vec::with_capacity(users.len());
760 for user in users {
761 let user = User::new(user);
762 if let Some(old) = self.users.insert(user.id, user.clone()) {
763 if old.github_login != user.github_login {
764 self.by_github_login.remove(&old.github_login);
765 }
766 }
767 self.by_github_login
768 .insert(user.github_login.clone(), user.id);
769 ret.push(user)
770 }
771 ret
772 }
773
774 pub fn set_participant_indices(
775 &mut self,
776 participant_indices: HashMap<u64, ParticipantIndex>,
777 cx: &mut ModelContext<Self>,
778 ) {
779 if participant_indices != self.participant_indices {
780 self.participant_indices = participant_indices;
781 cx.emit(Event::ParticipantIndicesChanged);
782 }
783 }
784
785 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
786 &self.participant_indices
787 }
788
789 pub fn participant_names(
790 &self,
791 user_ids: impl Iterator<Item = u64>,
792 cx: &AppContext,
793 ) -> HashMap<u64, SharedString> {
794 let mut ret = HashMap::default();
795 let mut missing_user_ids = Vec::new();
796 for id in user_ids {
797 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
798 ret.insert(id, github_login.into());
799 } else {
800 missing_user_ids.push(id)
801 }
802 }
803 if !missing_user_ids.is_empty() {
804 let this = self.weak_self.clone();
805 cx.spawn(|mut cx| async move {
806 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
807 .await
808 })
809 .detach_and_log_err(cx);
810 }
811 ret
812 }
813}
814
815impl User {
816 fn new(message: proto::User) -> Arc<Self> {
817 Arc::new(User {
818 id: message.id,
819 github_login: message.github_login,
820 avatar_uri: message.avatar_url.into(),
821 name: message.name,
822 email: message.email,
823 })
824 }
825}
826
827impl Contact {
828 async fn from_proto(
829 contact: proto::Contact,
830 user_store: &Model<UserStore>,
831 cx: &mut AsyncAppContext,
832 ) -> Result<Self> {
833 let user = user_store
834 .update(cx, |user_store, cx| {
835 user_store.get_user(contact.user_id, cx)
836 })?
837 .await?;
838 Ok(Self {
839 user,
840 online: contact.online,
841 busy: contact.busy,
842 })
843 }
844}
845
846impl Collaborator {
847 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
848 Ok(Self {
849 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
850 replica_id: message.replica_id as ReplicaId,
851 user_id: message.user_id as UserId,
852 is_host: message.is_host,
853 })
854 }
855}