1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags2::FeatureFlagAppExt;
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui2::{AsyncAppContext, EventEmitter, ImageData, Model, ModelContext, Task};
7use postage::{sink::Sink, watch};
8use rpc2::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use text::ReplicaId;
11use util::http::HttpClient;
12use util::TryFutureExt as _;
13
14pub type UserId = u64;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct ParticipantIndex(pub u32);
18
19#[derive(Default, Debug)]
20pub struct User {
21 pub id: UserId,
22 pub github_login: String,
23 pub avatar: Option<Arc<ImageData>>,
24}
25
26#[derive(Clone, Debug, PartialEq, Eq)]
27pub struct Collaborator {
28 pub peer_id: proto::PeerId,
29 pub replica_id: ReplicaId,
30 pub user_id: UserId,
31}
32
33impl PartialOrd for User {
34 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
35 Some(self.cmp(other))
36 }
37}
38
39impl Ord for User {
40 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
41 self.github_login.cmp(&other.github_login)
42 }
43}
44
45impl PartialEq for User {
46 fn eq(&self, other: &Self) -> bool {
47 self.id == other.id && self.github_login == other.github_login
48 }
49}
50
51impl Eq for User {}
52
53#[derive(Debug, PartialEq)]
54pub struct Contact {
55 pub user: Arc<User>,
56 pub online: bool,
57 pub busy: bool,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum ContactRequestStatus {
62 None,
63 RequestSent,
64 RequestReceived,
65 RequestAccepted,
66}
67
68pub struct UserStore {
69 users: HashMap<u64, Arc<User>>,
70 participant_indices: HashMap<u64, ParticipantIndex>,
71 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
72 current_user: watch::Receiver<Option<Arc<User>>>,
73 contacts: Vec<Arc<Contact>>,
74 incoming_contact_requests: Vec<Arc<User>>,
75 outgoing_contact_requests: Vec<Arc<User>>,
76 pending_contact_requests: HashMap<u64, usize>,
77 invite_info: Option<InviteInfo>,
78 client: Weak<Client>,
79 http: Arc<dyn HttpClient>,
80 _maintain_contacts: Task<()>,
81 _maintain_current_user: Task<Result<()>>,
82}
83
84#[derive(Clone)]
85pub struct InviteInfo {
86 pub count: u32,
87 pub url: Arc<str>,
88}
89
90pub enum Event {
91 Contact {
92 user: Arc<User>,
93 kind: ContactEventKind,
94 },
95 ShowContacts,
96 ParticipantIndicesChanged,
97}
98
99#[derive(Clone, Copy)]
100pub enum ContactEventKind {
101 Requested,
102 Accepted,
103 Cancelled,
104}
105
106impl EventEmitter for UserStore {
107 type Event = Event;
108}
109
110enum UpdateContacts {
111 Update(proto::UpdateContacts),
112 Wait(postage::barrier::Sender),
113 Clear(postage::barrier::Sender),
114}
115
116impl UserStore {
117 pub fn new(
118 client: Arc<Client>,
119 http: Arc<dyn HttpClient>,
120 cx: &mut ModelContext<Self>,
121 ) -> Self {
122 let (mut current_user_tx, current_user_rx) = watch::channel();
123 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
124 let rpc_subscriptions = vec![
125 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
126 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
127 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
128 ];
129 Self {
130 users: Default::default(),
131 current_user: current_user_rx,
132 contacts: Default::default(),
133 incoming_contact_requests: Default::default(),
134 participant_indices: Default::default(),
135 outgoing_contact_requests: Default::default(),
136 invite_info: None,
137 client: Arc::downgrade(&client),
138 update_contacts_tx,
139 http,
140 _maintain_contacts: cx.spawn(|this, mut cx| async move {
141 let _subscriptions = rpc_subscriptions;
142 while let Some(message) = update_contacts_rx.next().await {
143 if let Ok(task) =
144 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
145 {
146 task.log_err().await;
147 } else {
148 break;
149 }
150 }
151 }),
152 _maintain_current_user: cx.spawn(|this, mut cx| async move {
153 let mut status = client.status();
154 while let Some(status) = status.next().await {
155 match status {
156 Status::Connected { .. } => {
157 if let Some(user_id) = client.user_id() {
158 let fetch_user = if let Ok(fetch_user) = this
159 .update(&mut cx, |this, cx| {
160 this.get_user(user_id, cx).log_err()
161 }) {
162 fetch_user
163 } else {
164 break;
165 };
166 let fetch_metrics_id =
167 client.request(proto::GetPrivateUserInfo {}).log_err();
168 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
169
170 cx.update(|cx| {
171 if let Some(info) = info {
172 cx.update_flags(info.staff, info.flags);
173 client.telemetry.set_authenticated_user_info(
174 Some(info.metrics_id.clone()),
175 info.staff,
176 cx,
177 )
178 }
179 })?;
180
181 current_user_tx.send(user).await.ok();
182
183 this.update(&mut cx, |_, cx| cx.notify())?;
184 }
185 }
186 Status::SignedOut => {
187 current_user_tx.send(None).await.ok();
188 this.update(&mut cx, |this, cx| {
189 cx.notify();
190 this.clear_contacts()
191 })?
192 .await;
193 }
194 Status::ConnectionLost => {
195 this.update(&mut cx, |this, cx| {
196 cx.notify();
197 this.clear_contacts()
198 })?
199 .await;
200 }
201 _ => {}
202 }
203 }
204 Ok(())
205 }),
206 pending_contact_requests: Default::default(),
207 }
208 }
209
210 #[cfg(feature = "test-support")]
211 pub fn clear_cache(&mut self) {
212 self.users.clear();
213 }
214
215 async fn handle_update_invite_info(
216 this: Model<Self>,
217 message: TypedEnvelope<proto::UpdateInviteInfo>,
218 _: Arc<Client>,
219 mut cx: AsyncAppContext,
220 ) -> Result<()> {
221 this.update(&mut cx, |this, cx| {
222 this.invite_info = Some(InviteInfo {
223 url: Arc::from(message.payload.url),
224 count: message.payload.count,
225 });
226 cx.notify();
227 })?;
228 Ok(())
229 }
230
231 async fn handle_show_contacts(
232 this: Model<Self>,
233 _: TypedEnvelope<proto::ShowContacts>,
234 _: Arc<Client>,
235 mut cx: AsyncAppContext,
236 ) -> Result<()> {
237 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
238 Ok(())
239 }
240
241 pub fn invite_info(&self) -> Option<&InviteInfo> {
242 self.invite_info.as_ref()
243 }
244
245 async fn handle_update_contacts(
246 this: Model<Self>,
247 message: TypedEnvelope<proto::UpdateContacts>,
248 _: Arc<Client>,
249 mut cx: AsyncAppContext,
250 ) -> Result<()> {
251 this.update(&mut cx, |this, _| {
252 this.update_contacts_tx
253 .unbounded_send(UpdateContacts::Update(message.payload))
254 .unwrap();
255 })?;
256 Ok(())
257 }
258
259 fn update_contacts(
260 &mut self,
261 message: UpdateContacts,
262 cx: &mut ModelContext<Self>,
263 ) -> Task<Result<()>> {
264 match message {
265 UpdateContacts::Wait(barrier) => {
266 drop(barrier);
267 Task::ready(Ok(()))
268 }
269 UpdateContacts::Clear(barrier) => {
270 self.contacts.clear();
271 self.incoming_contact_requests.clear();
272 self.outgoing_contact_requests.clear();
273 drop(barrier);
274 Task::ready(Ok(()))
275 }
276 UpdateContacts::Update(message) => {
277 let mut user_ids = HashSet::default();
278 for contact in &message.contacts {
279 user_ids.insert(contact.user_id);
280 }
281 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
282 user_ids.extend(message.outgoing_requests.iter());
283
284 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
285 cx.spawn(|this, mut cx| async move {
286 load_users.await?;
287
288 // Users are fetched in parallel above and cached in call to get_users
289 // No need to paralellize here
290 let mut updated_contacts = Vec::new();
291 let this = this
292 .upgrade()
293 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
294 for contact in message.contacts {
295 let should_notify = contact.should_notify;
296 updated_contacts.push((
297 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
298 should_notify,
299 ));
300 }
301
302 let mut incoming_requests = Vec::new();
303 for request in message.incoming_requests {
304 incoming_requests.push({
305 let user = this
306 .update(&mut cx, |this, cx| {
307 this.get_user(request.requester_id, cx)
308 })?
309 .await?;
310 (user, request.should_notify)
311 });
312 }
313
314 let mut outgoing_requests = Vec::new();
315 for requested_user_id in message.outgoing_requests {
316 outgoing_requests.push(
317 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
318 .await?,
319 );
320 }
321
322 let removed_contacts =
323 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
324 let removed_incoming_requests =
325 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
326 let removed_outgoing_requests =
327 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
328
329 this.update(&mut cx, |this, cx| {
330 // Remove contacts
331 this.contacts
332 .retain(|contact| !removed_contacts.contains(&contact.user.id));
333 // Update existing contacts and insert new ones
334 for (updated_contact, should_notify) in updated_contacts {
335 if should_notify {
336 cx.emit(Event::Contact {
337 user: updated_contact.user.clone(),
338 kind: ContactEventKind::Accepted,
339 });
340 }
341 match this.contacts.binary_search_by_key(
342 &&updated_contact.user.github_login,
343 |contact| &contact.user.github_login,
344 ) {
345 Ok(ix) => this.contacts[ix] = updated_contact,
346 Err(ix) => this.contacts.insert(ix, updated_contact),
347 }
348 }
349
350 // Remove incoming contact requests
351 this.incoming_contact_requests.retain(|user| {
352 if removed_incoming_requests.contains(&user.id) {
353 cx.emit(Event::Contact {
354 user: user.clone(),
355 kind: ContactEventKind::Cancelled,
356 });
357 false
358 } else {
359 true
360 }
361 });
362 // Update existing incoming requests and insert new ones
363 for (user, should_notify) in incoming_requests {
364 if should_notify {
365 cx.emit(Event::Contact {
366 user: user.clone(),
367 kind: ContactEventKind::Requested,
368 });
369 }
370
371 match this
372 .incoming_contact_requests
373 .binary_search_by_key(&&user.github_login, |contact| {
374 &contact.github_login
375 }) {
376 Ok(ix) => this.incoming_contact_requests[ix] = user,
377 Err(ix) => this.incoming_contact_requests.insert(ix, user),
378 }
379 }
380
381 // Remove outgoing contact requests
382 this.outgoing_contact_requests
383 .retain(|user| !removed_outgoing_requests.contains(&user.id));
384 // Update existing incoming requests and insert new ones
385 for request in outgoing_requests {
386 match this
387 .outgoing_contact_requests
388 .binary_search_by_key(&&request.github_login, |contact| {
389 &contact.github_login
390 }) {
391 Ok(ix) => this.outgoing_contact_requests[ix] = request,
392 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
393 }
394 }
395
396 cx.notify();
397 })?;
398
399 Ok(())
400 })
401 }
402 }
403 }
404
405 pub fn contacts(&self) -> &[Arc<Contact>] {
406 &self.contacts
407 }
408
409 pub fn has_contact(&self, user: &Arc<User>) -> bool {
410 self.contacts
411 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
412 .is_ok()
413 }
414
415 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
416 &self.incoming_contact_requests
417 }
418
419 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
420 &self.outgoing_contact_requests
421 }
422
423 pub fn is_contact_request_pending(&self, user: &User) -> bool {
424 self.pending_contact_requests.contains_key(&user.id)
425 }
426
427 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
428 if self
429 .contacts
430 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
431 .is_ok()
432 {
433 ContactRequestStatus::RequestAccepted
434 } else if self
435 .outgoing_contact_requests
436 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
437 .is_ok()
438 {
439 ContactRequestStatus::RequestSent
440 } else if self
441 .incoming_contact_requests
442 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
443 .is_ok()
444 {
445 ContactRequestStatus::RequestReceived
446 } else {
447 ContactRequestStatus::None
448 }
449 }
450
451 pub fn request_contact(
452 &mut self,
453 responder_id: u64,
454 cx: &mut ModelContext<Self>,
455 ) -> Task<Result<()>> {
456 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
457 }
458
459 pub fn remove_contact(
460 &mut self,
461 user_id: u64,
462 cx: &mut ModelContext<Self>,
463 ) -> Task<Result<()>> {
464 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
465 }
466
467 pub fn respond_to_contact_request(
468 &mut self,
469 requester_id: u64,
470 accept: bool,
471 cx: &mut ModelContext<Self>,
472 ) -> Task<Result<()>> {
473 self.perform_contact_request(
474 requester_id,
475 proto::RespondToContactRequest {
476 requester_id,
477 response: if accept {
478 proto::ContactRequestResponse::Accept
479 } else {
480 proto::ContactRequestResponse::Decline
481 } as i32,
482 },
483 cx,
484 )
485 }
486
487 pub fn dismiss_contact_request(
488 &mut self,
489 requester_id: u64,
490 cx: &mut ModelContext<Self>,
491 ) -> Task<Result<()>> {
492 let client = self.client.upgrade();
493 cx.spawn(move |_, _| async move {
494 client
495 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
496 .request(proto::RespondToContactRequest {
497 requester_id,
498 response: proto::ContactRequestResponse::Dismiss as i32,
499 })
500 .await?;
501 Ok(())
502 })
503 }
504
505 fn perform_contact_request<T: RequestMessage>(
506 &mut self,
507 user_id: u64,
508 request: T,
509 cx: &mut ModelContext<Self>,
510 ) -> Task<Result<()>> {
511 let client = self.client.upgrade();
512 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
513 cx.notify();
514
515 cx.spawn(move |this, mut cx| async move {
516 let response = client
517 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
518 .request(request)
519 .await;
520 this.update(&mut cx, |this, cx| {
521 if let Entry::Occupied(mut request_count) =
522 this.pending_contact_requests.entry(user_id)
523 {
524 *request_count.get_mut() -= 1;
525 if *request_count.get() == 0 {
526 request_count.remove();
527 }
528 }
529 cx.notify();
530 })?;
531 response?;
532 Ok(())
533 })
534 }
535
536 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
537 let (tx, mut rx) = postage::barrier::channel();
538 self.update_contacts_tx
539 .unbounded_send(UpdateContacts::Clear(tx))
540 .unwrap();
541 async move {
542 rx.next().await;
543 }
544 }
545
546 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
547 let (tx, mut rx) = postage::barrier::channel();
548 self.update_contacts_tx
549 .unbounded_send(UpdateContacts::Wait(tx))
550 .unwrap();
551 async move {
552 rx.next().await;
553 }
554 }
555
556 pub fn get_users(
557 &mut self,
558 user_ids: Vec<u64>,
559 cx: &mut ModelContext<Self>,
560 ) -> Task<Result<Vec<Arc<User>>>> {
561 let mut user_ids_to_fetch = user_ids.clone();
562 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
563
564 cx.spawn(|this, mut cx| async move {
565 if !user_ids_to_fetch.is_empty() {
566 this.update(&mut cx, |this, cx| {
567 this.load_users(
568 proto::GetUsers {
569 user_ids: user_ids_to_fetch,
570 },
571 cx,
572 )
573 })?
574 .await?;
575 }
576
577 this.update(&mut cx, |this, _| {
578 user_ids
579 .iter()
580 .map(|user_id| {
581 this.users
582 .get(user_id)
583 .cloned()
584 .ok_or_else(|| anyhow!("user {} not found", user_id))
585 })
586 .collect()
587 })?
588 })
589 }
590
591 pub fn fuzzy_search_users(
592 &mut self,
593 query: String,
594 cx: &mut ModelContext<Self>,
595 ) -> Task<Result<Vec<Arc<User>>>> {
596 self.load_users(proto::FuzzySearchUsers { query }, cx)
597 }
598
599 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
600 self.users.get(&user_id).cloned()
601 }
602
603 pub fn get_user(
604 &mut self,
605 user_id: u64,
606 cx: &mut ModelContext<Self>,
607 ) -> Task<Result<Arc<User>>> {
608 if let Some(user) = self.users.get(&user_id).cloned() {
609 return Task::ready(Ok(user));
610 }
611
612 let load_users = self.get_users(vec![user_id], cx);
613 cx.spawn(move |this, mut cx| async move {
614 load_users.await?;
615 this.update(&mut cx, |this, _| {
616 this.users
617 .get(&user_id)
618 .cloned()
619 .ok_or_else(|| anyhow!("server responded with no users"))
620 })?
621 })
622 }
623
624 pub fn current_user(&self) -> Option<Arc<User>> {
625 self.current_user.borrow().clone()
626 }
627
628 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
629 self.current_user.clone()
630 }
631
632 fn load_users(
633 &mut self,
634 request: impl RequestMessage<Response = UsersResponse>,
635 cx: &mut ModelContext<Self>,
636 ) -> Task<Result<Vec<Arc<User>>>> {
637 let client = self.client.clone();
638 let http = self.http.clone();
639 cx.spawn(|this, mut cx| async move {
640 if let Some(rpc) = client.upgrade() {
641 let response = rpc.request(request).await.context("error loading users")?;
642 let users = future::join_all(
643 response
644 .users
645 .into_iter()
646 .map(|user| User::new(user, http.as_ref())),
647 )
648 .await;
649
650 this.update(&mut cx, |this, _| {
651 for user in &users {
652 this.users.insert(user.id, user.clone());
653 }
654 })
655 .ok();
656
657 Ok(users)
658 } else {
659 Ok(Vec::new())
660 }
661 })
662 }
663
664 pub fn set_participant_indices(
665 &mut self,
666 participant_indices: HashMap<u64, ParticipantIndex>,
667 cx: &mut ModelContext<Self>,
668 ) {
669 if participant_indices != self.participant_indices {
670 self.participant_indices = participant_indices;
671 cx.emit(Event::ParticipantIndicesChanged);
672 }
673 }
674
675 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
676 &self.participant_indices
677 }
678}
679
680impl User {
681 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
682 Arc::new(User {
683 id: message.id,
684 github_login: message.github_login,
685 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
686 })
687 }
688}
689
690impl Contact {
691 async fn from_proto(
692 contact: proto::Contact,
693 user_store: &Model<UserStore>,
694 cx: &mut AsyncAppContext,
695 ) -> Result<Self> {
696 let user = user_store
697 .update(cx, |user_store, cx| {
698 user_store.get_user(contact.user_id, cx)
699 })?
700 .await?;
701 Ok(Self {
702 user,
703 online: contact.online,
704 busy: contact.busy,
705 })
706 }
707}
708
709impl Collaborator {
710 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
711 Ok(Self {
712 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
713 replica_id: message.replica_id as ReplicaId,
714 user_id: message.user_id as UserId,
715 })
716 }
717}
718
719// todo!("we probably don't need this now that we fetch")
720async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
721 let mut response = http
722 .get(url, Default::default(), true)
723 .await
724 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
725
726 if !response.status().is_success() {
727 return Err(anyhow!("avatar request failed {:?}", response.status()));
728 }
729
730 let mut body = Vec::new();
731 response
732 .body_mut()
733 .read_to_end(&mut body)
734 .await
735 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
736 let format = image::guess_format(&body)?;
737 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
738 Ok(Arc::new(ImageData::new(image)))
739}