1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use crate::incoming_call::IncomingCall;
3use anyhow::{anyhow, Context, Result};
4use collections::{hash_map::Entry, HashMap, HashSet};
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use util::TryFutureExt as _;
11
12#[derive(Default, Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum ContactRequestStatus {
47 None,
48 RequestSent,
49 RequestReceived,
50 RequestAccepted,
51}
52
53pub struct UserStore {
54 users: HashMap<u64, Arc<User>>,
55 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
56 current_user: watch::Receiver<Option<Arc<User>>>,
57 contacts: Vec<Arc<Contact>>,
58 incoming_contact_requests: Vec<Arc<User>>,
59 outgoing_contact_requests: Vec<Arc<User>>,
60 pending_contact_requests: HashMap<u64, usize>,
61 invite_info: Option<InviteInfo>,
62 incoming_call: (
63 watch::Sender<Option<IncomingCall>>,
64 watch::Receiver<Option<IncomingCall>>,
65 ),
66 client: Weak<Client>,
67 http: Arc<dyn HttpClient>,
68 _maintain_contacts: Task<()>,
69 _maintain_current_user: Task<()>,
70}
71
72#[derive(Clone)]
73pub struct InviteInfo {
74 pub count: u32,
75 pub url: Arc<str>,
76}
77
78pub enum Event {
79 Contact {
80 user: Arc<User>,
81 kind: ContactEventKind,
82 },
83 ShowContacts,
84}
85
86#[derive(Clone, Copy)]
87pub enum ContactEventKind {
88 Requested,
89 Accepted,
90 Cancelled,
91}
92
93impl Entity for UserStore {
94 type Event = Event;
95}
96
97enum UpdateContacts {
98 Update(proto::UpdateContacts),
99 Wait(postage::barrier::Sender),
100 Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104 pub fn new(
105 client: Arc<Client>,
106 http: Arc<dyn HttpClient>,
107 cx: &mut ModelContext<Self>,
108 ) -> Self {
109 let (mut current_user_tx, current_user_rx) = watch::channel();
110 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111 let rpc_subscriptions = vec![
112 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
115 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
116 client.add_message_handler(cx.handle(), Self::handle_cancel_call),
117 ];
118 Self {
119 users: Default::default(),
120 current_user: current_user_rx,
121 contacts: Default::default(),
122 incoming_contact_requests: Default::default(),
123 outgoing_contact_requests: Default::default(),
124 invite_info: None,
125 incoming_call: watch::channel(),
126 client: Arc::downgrade(&client),
127 update_contacts_tx,
128 http,
129 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130 let _subscriptions = rpc_subscriptions;
131 while let Some(message) = update_contacts_rx.next().await {
132 if let Some(this) = this.upgrade(&cx) {
133 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134 .log_err()
135 .await;
136 }
137 }
138 }),
139 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140 let mut status = client.status();
141 while let Some(status) = status.next().await {
142 match status {
143 Status::Connected { .. } => {
144 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145 let user = this
146 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
147 .log_err()
148 .await;
149 current_user_tx.send(user).await.ok();
150 }
151 }
152 Status::SignedOut => {
153 current_user_tx.send(None).await.ok();
154 if let Some(this) = this.upgrade(&cx) {
155 this.update(&mut cx, |this, _| this.clear_contacts()).await;
156 }
157 }
158 Status::ConnectionLost => {
159 if let Some(this) = this.upgrade(&cx) {
160 this.update(&mut cx, |this, _| this.clear_contacts()).await;
161 }
162 }
163 _ => {}
164 }
165 }
166 }),
167 pending_contact_requests: Default::default(),
168 }
169 }
170
171 async fn handle_update_invite_info(
172 this: ModelHandle<Self>,
173 message: TypedEnvelope<proto::UpdateInviteInfo>,
174 _: Arc<Client>,
175 mut cx: AsyncAppContext,
176 ) -> Result<()> {
177 this.update(&mut cx, |this, cx| {
178 this.invite_info = Some(InviteInfo {
179 url: Arc::from(message.payload.url),
180 count: message.payload.count,
181 });
182 cx.notify();
183 });
184 Ok(())
185 }
186
187 async fn handle_show_contacts(
188 this: ModelHandle<Self>,
189 _: TypedEnvelope<proto::ShowContacts>,
190 _: Arc<Client>,
191 mut cx: AsyncAppContext,
192 ) -> Result<()> {
193 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
194 Ok(())
195 }
196
197 async fn handle_incoming_call(
198 this: ModelHandle<Self>,
199 envelope: TypedEnvelope<proto::IncomingCall>,
200 _: Arc<Client>,
201 mut cx: AsyncAppContext,
202 ) -> Result<proto::Ack> {
203 let call = IncomingCall {
204 room_id: envelope.payload.room_id,
205 participants: this
206 .update(&mut cx, |this, cx| {
207 this.get_users(envelope.payload.participant_user_ids, cx)
208 })
209 .await?,
210 caller: this
211 .update(&mut cx, |this, cx| {
212 this.get_user(envelope.payload.caller_user_id, cx)
213 })
214 .await?,
215 initial_project_id: envelope.payload.initial_project_id,
216 };
217 this.update(&mut cx, |this, _| {
218 *this.incoming_call.0.borrow_mut() = Some(call);
219 });
220
221 Ok(proto::Ack {})
222 }
223
224 async fn handle_cancel_call(
225 this: ModelHandle<Self>,
226 _: TypedEnvelope<proto::CancelCall>,
227 _: Arc<Client>,
228 mut cx: AsyncAppContext,
229 ) -> Result<()> {
230 this.update(&mut cx, |this, _| {
231 *this.incoming_call.0.borrow_mut() = None;
232 });
233 Ok(())
234 }
235
236 pub fn invite_info(&self) -> Option<&InviteInfo> {
237 self.invite_info.as_ref()
238 }
239
240 pub fn incoming_call(&self) -> watch::Receiver<Option<IncomingCall>> {
241 self.incoming_call.1.clone()
242 }
243
244 pub fn decline_call(&mut self) -> Result<()> {
245 if let Some(client) = self.client.upgrade() {
246 client.send(proto::DeclineCall {})?;
247 }
248 Ok(())
249 }
250
251 async fn handle_update_contacts(
252 this: ModelHandle<Self>,
253 message: TypedEnvelope<proto::UpdateContacts>,
254 _: Arc<Client>,
255 mut cx: AsyncAppContext,
256 ) -> Result<()> {
257 this.update(&mut cx, |this, _| {
258 this.update_contacts_tx
259 .unbounded_send(UpdateContacts::Update(message.payload))
260 .unwrap();
261 });
262 Ok(())
263 }
264
265 fn update_contacts(
266 &mut self,
267 message: UpdateContacts,
268 cx: &mut ModelContext<Self>,
269 ) -> Task<Result<()>> {
270 match message {
271 UpdateContacts::Wait(barrier) => {
272 drop(barrier);
273 Task::ready(Ok(()))
274 }
275 UpdateContacts::Clear(barrier) => {
276 self.contacts.clear();
277 self.incoming_contact_requests.clear();
278 self.outgoing_contact_requests.clear();
279 drop(barrier);
280 Task::ready(Ok(()))
281 }
282 UpdateContacts::Update(message) => {
283 let mut user_ids = HashSet::default();
284 for contact in &message.contacts {
285 user_ids.insert(contact.user_id);
286 }
287 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
288 user_ids.extend(message.outgoing_requests.iter());
289
290 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
291 cx.spawn(|this, mut cx| async move {
292 load_users.await?;
293
294 // Users are fetched in parallel above and cached in call to get_users
295 // No need to paralellize here
296 let mut updated_contacts = Vec::new();
297 for contact in message.contacts {
298 let should_notify = contact.should_notify;
299 updated_contacts.push((
300 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
301 should_notify,
302 ));
303 }
304
305 let mut incoming_requests = Vec::new();
306 for request in message.incoming_requests {
307 incoming_requests.push({
308 let user = this
309 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
310 .await?;
311 (user, request.should_notify)
312 });
313 }
314
315 let mut outgoing_requests = Vec::new();
316 for requested_user_id in message.outgoing_requests {
317 outgoing_requests.push(
318 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
319 .await?,
320 );
321 }
322
323 let removed_contacts =
324 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
325 let removed_incoming_requests =
326 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
327 let removed_outgoing_requests =
328 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
329
330 this.update(&mut cx, |this, cx| {
331 // Remove contacts
332 this.contacts
333 .retain(|contact| !removed_contacts.contains(&contact.user.id));
334 // Update existing contacts and insert new ones
335 for (updated_contact, should_notify) in updated_contacts {
336 if should_notify {
337 cx.emit(Event::Contact {
338 user: updated_contact.user.clone(),
339 kind: ContactEventKind::Accepted,
340 });
341 }
342 match this.contacts.binary_search_by_key(
343 &&updated_contact.user.github_login,
344 |contact| &contact.user.github_login,
345 ) {
346 Ok(ix) => this.contacts[ix] = updated_contact,
347 Err(ix) => this.contacts.insert(ix, updated_contact),
348 }
349 }
350
351 // Remove incoming contact requests
352 this.incoming_contact_requests.retain(|user| {
353 if removed_incoming_requests.contains(&user.id) {
354 cx.emit(Event::Contact {
355 user: user.clone(),
356 kind: ContactEventKind::Cancelled,
357 });
358 false
359 } else {
360 true
361 }
362 });
363 // Update existing incoming requests and insert new ones
364 for (user, should_notify) in incoming_requests {
365 if should_notify {
366 cx.emit(Event::Contact {
367 user: user.clone(),
368 kind: ContactEventKind::Requested,
369 });
370 }
371
372 match this
373 .incoming_contact_requests
374 .binary_search_by_key(&&user.github_login, |contact| {
375 &contact.github_login
376 }) {
377 Ok(ix) => this.incoming_contact_requests[ix] = user,
378 Err(ix) => this.incoming_contact_requests.insert(ix, user),
379 }
380 }
381
382 // Remove outgoing contact requests
383 this.outgoing_contact_requests
384 .retain(|user| !removed_outgoing_requests.contains(&user.id));
385 // Update existing incoming requests and insert new ones
386 for request in outgoing_requests {
387 match this
388 .outgoing_contact_requests
389 .binary_search_by_key(&&request.github_login, |contact| {
390 &contact.github_login
391 }) {
392 Ok(ix) => this.outgoing_contact_requests[ix] = request,
393 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
394 }
395 }
396
397 cx.notify();
398 });
399
400 Ok(())
401 })
402 }
403 }
404 }
405
406 pub fn contacts(&self) -> &[Arc<Contact>] {
407 &self.contacts
408 }
409
410 pub fn has_contact(&self, user: &Arc<User>) -> bool {
411 self.contacts
412 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
413 .is_ok()
414 }
415
416 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
417 &self.incoming_contact_requests
418 }
419
420 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
421 &self.outgoing_contact_requests
422 }
423
424 pub fn is_contact_request_pending(&self, user: &User) -> bool {
425 self.pending_contact_requests.contains_key(&user.id)
426 }
427
428 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
429 if self
430 .contacts
431 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
432 .is_ok()
433 {
434 ContactRequestStatus::RequestAccepted
435 } else if self
436 .outgoing_contact_requests
437 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
438 .is_ok()
439 {
440 ContactRequestStatus::RequestSent
441 } else if self
442 .incoming_contact_requests
443 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
444 .is_ok()
445 {
446 ContactRequestStatus::RequestReceived
447 } else {
448 ContactRequestStatus::None
449 }
450 }
451
452 pub fn request_contact(
453 &mut self,
454 responder_id: u64,
455 cx: &mut ModelContext<Self>,
456 ) -> Task<Result<()>> {
457 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
458 }
459
460 pub fn remove_contact(
461 &mut self,
462 user_id: u64,
463 cx: &mut ModelContext<Self>,
464 ) -> Task<Result<()>> {
465 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
466 }
467
468 pub fn respond_to_contact_request(
469 &mut self,
470 requester_id: u64,
471 accept: bool,
472 cx: &mut ModelContext<Self>,
473 ) -> Task<Result<()>> {
474 self.perform_contact_request(
475 requester_id,
476 proto::RespondToContactRequest {
477 requester_id,
478 response: if accept {
479 proto::ContactRequestResponse::Accept
480 } else {
481 proto::ContactRequestResponse::Decline
482 } as i32,
483 },
484 cx,
485 )
486 }
487
488 pub fn dismiss_contact_request(
489 &mut self,
490 requester_id: u64,
491 cx: &mut ModelContext<Self>,
492 ) -> Task<Result<()>> {
493 let client = self.client.upgrade();
494 cx.spawn_weak(|_, _| async move {
495 client
496 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
497 .request(proto::RespondToContactRequest {
498 requester_id,
499 response: proto::ContactRequestResponse::Dismiss as i32,
500 })
501 .await?;
502 Ok(())
503 })
504 }
505
506 fn perform_contact_request<T: RequestMessage>(
507 &mut self,
508 user_id: u64,
509 request: T,
510 cx: &mut ModelContext<Self>,
511 ) -> Task<Result<()>> {
512 let client = self.client.upgrade();
513 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
514 cx.notify();
515
516 cx.spawn(|this, mut cx| async move {
517 let response = client
518 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
519 .request(request)
520 .await;
521 this.update(&mut cx, |this, cx| {
522 if let Entry::Occupied(mut request_count) =
523 this.pending_contact_requests.entry(user_id)
524 {
525 *request_count.get_mut() -= 1;
526 if *request_count.get() == 0 {
527 request_count.remove();
528 }
529 }
530 cx.notify();
531 });
532 response?;
533 Ok(())
534 })
535 }
536
537 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
538 let (tx, mut rx) = postage::barrier::channel();
539 self.update_contacts_tx
540 .unbounded_send(UpdateContacts::Clear(tx))
541 .unwrap();
542 async move {
543 rx.next().await;
544 }
545 }
546
547 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
548 let (tx, mut rx) = postage::barrier::channel();
549 self.update_contacts_tx
550 .unbounded_send(UpdateContacts::Wait(tx))
551 .unwrap();
552 async move {
553 rx.next().await;
554 }
555 }
556
557 pub fn get_users(
558 &mut self,
559 user_ids: Vec<u64>,
560 cx: &mut ModelContext<Self>,
561 ) -> Task<Result<Vec<Arc<User>>>> {
562 let mut user_ids_to_fetch = user_ids.clone();
563 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
564
565 cx.spawn(|this, mut cx| async move {
566 if !user_ids_to_fetch.is_empty() {
567 this.update(&mut cx, |this, cx| {
568 this.load_users(
569 proto::GetUsers {
570 user_ids: user_ids_to_fetch,
571 },
572 cx,
573 )
574 })
575 .await?;
576 }
577
578 this.read_with(&cx, |this, _| {
579 user_ids
580 .iter()
581 .map(|user_id| {
582 this.users
583 .get(user_id)
584 .cloned()
585 .ok_or_else(|| anyhow!("user {} not found", user_id))
586 })
587 .collect()
588 })
589 })
590 }
591
592 pub fn fuzzy_search_users(
593 &mut self,
594 query: String,
595 cx: &mut ModelContext<Self>,
596 ) -> Task<Result<Vec<Arc<User>>>> {
597 self.load_users(proto::FuzzySearchUsers { query }, cx)
598 }
599
600 pub fn get_user(
601 &mut self,
602 user_id: u64,
603 cx: &mut ModelContext<Self>,
604 ) -> Task<Result<Arc<User>>> {
605 if let Some(user) = self.users.get(&user_id).cloned() {
606 return cx.foreground().spawn(async move { Ok(user) });
607 }
608
609 let load_users = self.get_users(vec![user_id], cx);
610 cx.spawn(|this, mut cx| async move {
611 load_users.await?;
612 this.update(&mut cx, |this, _| {
613 this.users
614 .get(&user_id)
615 .cloned()
616 .ok_or_else(|| anyhow!("server responded with no users"))
617 })
618 })
619 }
620
621 pub fn current_user(&self) -> Option<Arc<User>> {
622 self.current_user.borrow().clone()
623 }
624
625 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
626 self.current_user.clone()
627 }
628
629 fn load_users(
630 &mut self,
631 request: impl RequestMessage<Response = UsersResponse>,
632 cx: &mut ModelContext<Self>,
633 ) -> Task<Result<Vec<Arc<User>>>> {
634 let client = self.client.clone();
635 let http = self.http.clone();
636 cx.spawn_weak(|this, mut cx| async move {
637 if let Some(rpc) = client.upgrade() {
638 let response = rpc.request(request).await.context("error loading users")?;
639 let users = future::join_all(
640 response
641 .users
642 .into_iter()
643 .map(|user| User::new(user, http.as_ref())),
644 )
645 .await;
646
647 if let Some(this) = this.upgrade(&cx) {
648 this.update(&mut cx, |this, _| {
649 for user in &users {
650 this.users.insert(user.id, user.clone());
651 }
652 });
653 }
654 Ok(users)
655 } else {
656 Ok(Vec::new())
657 }
658 })
659 }
660}
661
662impl User {
663 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
664 Arc::new(User {
665 id: message.id,
666 github_login: message.github_login,
667 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
668 })
669 }
670}
671
672impl Contact {
673 async fn from_proto(
674 contact: proto::Contact,
675 user_store: &ModelHandle<UserStore>,
676 cx: &mut AsyncAppContext,
677 ) -> Result<Self> {
678 let user = user_store
679 .update(cx, |user_store, cx| {
680 user_store.get_user(contact.user_id, cx)
681 })
682 .await?;
683 Ok(Self {
684 user,
685 online: contact.online,
686 })
687 }
688}
689
690async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
691 let mut response = http
692 .get(url, Default::default(), true)
693 .await
694 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
695
696 if !response.status().is_success() {
697 return Err(anyhow!("avatar request failed {:?}", response.status()));
698 }
699
700 let mut body = Vec::new();
701 response
702 .body_mut()
703 .read_to_end(&mut body)
704 .await
705 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
706 let format = image::guess_format(&body)?;
707 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
708 Ok(ImageData::new(image))
709}