1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use crate::incoming_call::IncomingCall;
3use anyhow::{anyhow, Context, Result};
4use collections::{hash_map::Entry, HashMap, HashSet};
5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
7use postage::{sink::Sink, watch};
8use rpc::proto::{RequestMessage, UsersResponse};
9use std::sync::{Arc, Weak};
10use util::TryFutureExt as _;
11
12#[derive(Debug)]
13pub struct User {
14 pub id: u64,
15 pub github_login: String,
16 pub avatar: Option<Arc<ImageData>>,
17}
18
19impl PartialOrd for User {
20 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
21 Some(self.cmp(other))
22 }
23}
24
25impl Ord for User {
26 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
27 self.github_login.cmp(&other.github_login)
28 }
29}
30
31impl PartialEq for User {
32 fn eq(&self, other: &Self) -> bool {
33 self.id == other.id && self.github_login == other.github_login
34 }
35}
36
37impl Eq for User {}
38
39#[derive(Debug, PartialEq)]
40pub struct Contact {
41 pub user: Arc<User>,
42 pub online: bool,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum ContactRequestStatus {
47 None,
48 RequestSent,
49 RequestReceived,
50 RequestAccepted,
51}
52
53pub struct UserStore {
54 users: HashMap<u64, Arc<User>>,
55 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
56 current_user: watch::Receiver<Option<Arc<User>>>,
57 contacts: Vec<Arc<Contact>>,
58 incoming_contact_requests: Vec<Arc<User>>,
59 outgoing_contact_requests: Vec<Arc<User>>,
60 pending_contact_requests: HashMap<u64, usize>,
61 invite_info: Option<InviteInfo>,
62 incoming_call: (
63 watch::Sender<Option<IncomingCall>>,
64 watch::Receiver<Option<IncomingCall>>,
65 ),
66 client: Weak<Client>,
67 http: Arc<dyn HttpClient>,
68 _maintain_contacts: Task<()>,
69 _maintain_current_user: Task<()>,
70}
71
72#[derive(Clone)]
73pub struct InviteInfo {
74 pub count: u32,
75 pub url: Arc<str>,
76}
77
78pub enum Event {
79 Contact {
80 user: Arc<User>,
81 kind: ContactEventKind,
82 },
83 ShowContacts,
84}
85
86#[derive(Clone, Copy)]
87pub enum ContactEventKind {
88 Requested,
89 Accepted,
90 Cancelled,
91}
92
93impl Entity for UserStore {
94 type Event = Event;
95}
96
97enum UpdateContacts {
98 Update(proto::UpdateContacts),
99 Wait(postage::barrier::Sender),
100 Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104 pub fn new(
105 client: Arc<Client>,
106 http: Arc<dyn HttpClient>,
107 cx: &mut ModelContext<Self>,
108 ) -> Self {
109 let (mut current_user_tx, current_user_rx) = watch::channel();
110 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111 let rpc_subscriptions = vec![
112 client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113 client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114 client.add_message_handler(cx.handle(), Self::handle_show_contacts),
115 client.add_request_handler(cx.handle(), Self::handle_incoming_call),
116 client.add_message_handler(cx.handle(), Self::handle_cancel_call),
117 ];
118 Self {
119 users: Default::default(),
120 current_user: current_user_rx,
121 contacts: Default::default(),
122 incoming_contact_requests: Default::default(),
123 outgoing_contact_requests: Default::default(),
124 invite_info: None,
125 incoming_call: watch::channel(),
126 client: Arc::downgrade(&client),
127 update_contacts_tx,
128 http,
129 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130 let _subscriptions = rpc_subscriptions;
131 while let Some(message) = update_contacts_rx.next().await {
132 if let Some(this) = this.upgrade(&cx) {
133 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134 .log_err()
135 .await;
136 }
137 }
138 }),
139 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140 let mut status = client.status();
141 while let Some(status) = status.next().await {
142 match status {
143 Status::Connected { .. } => {
144 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145 let user = this
146 .update(&mut cx, |this, cx| this.get_user(user_id, cx))
147 .log_err()
148 .await;
149 current_user_tx.send(user).await.ok();
150 }
151 }
152 Status::SignedOut => {
153 current_user_tx.send(None).await.ok();
154 if let Some(this) = this.upgrade(&cx) {
155 this.update(&mut cx, |this, _| this.clear_contacts()).await;
156 }
157 }
158 Status::ConnectionLost => {
159 if let Some(this) = this.upgrade(&cx) {
160 this.update(&mut cx, |this, _| this.clear_contacts()).await;
161 }
162 }
163 _ => {}
164 }
165 }
166 }),
167 pending_contact_requests: Default::default(),
168 }
169 }
170
171 async fn handle_update_invite_info(
172 this: ModelHandle<Self>,
173 message: TypedEnvelope<proto::UpdateInviteInfo>,
174 _: Arc<Client>,
175 mut cx: AsyncAppContext,
176 ) -> Result<()> {
177 this.update(&mut cx, |this, cx| {
178 this.invite_info = Some(InviteInfo {
179 url: Arc::from(message.payload.url),
180 count: message.payload.count,
181 });
182 cx.notify();
183 });
184 Ok(())
185 }
186
187 async fn handle_show_contacts(
188 this: ModelHandle<Self>,
189 _: TypedEnvelope<proto::ShowContacts>,
190 _: Arc<Client>,
191 mut cx: AsyncAppContext,
192 ) -> Result<()> {
193 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
194 Ok(())
195 }
196
197 async fn handle_incoming_call(
198 this: ModelHandle<Self>,
199 envelope: TypedEnvelope<proto::IncomingCall>,
200 _: Arc<Client>,
201 mut cx: AsyncAppContext,
202 ) -> Result<proto::Ack> {
203 let call = IncomingCall {
204 room_id: envelope.payload.room_id,
205 participants: this
206 .update(&mut cx, |this, cx| {
207 this.get_users(envelope.payload.participant_user_ids, cx)
208 })
209 .await?,
210 caller: this
211 .update(&mut cx, |this, cx| {
212 this.get_user(envelope.payload.caller_user_id, cx)
213 })
214 .await?,
215 };
216 this.update(&mut cx, |this, _| {
217 *this.incoming_call.0.borrow_mut() = Some(call);
218 });
219
220 Ok(proto::Ack {})
221 }
222
223 async fn handle_cancel_call(
224 this: ModelHandle<Self>,
225 _: TypedEnvelope<proto::CancelCall>,
226 _: Arc<Client>,
227 mut cx: AsyncAppContext,
228 ) -> Result<()> {
229 this.update(&mut cx, |this, _| {
230 *this.incoming_call.0.borrow_mut() = None;
231 });
232 Ok(())
233 }
234
235 pub fn invite_info(&self) -> Option<&InviteInfo> {
236 self.invite_info.as_ref()
237 }
238
239 pub fn incoming_call(&self) -> watch::Receiver<Option<IncomingCall>> {
240 self.incoming_call.1.clone()
241 }
242
243 pub fn decline_call(&mut self) -> Result<()> {
244 if let Some(client) = self.client.upgrade() {
245 client.send(proto::DeclineCall {})?;
246 }
247 Ok(())
248 }
249
250 async fn handle_update_contacts(
251 this: ModelHandle<Self>,
252 message: TypedEnvelope<proto::UpdateContacts>,
253 _: Arc<Client>,
254 mut cx: AsyncAppContext,
255 ) -> Result<()> {
256 this.update(&mut cx, |this, _| {
257 this.update_contacts_tx
258 .unbounded_send(UpdateContacts::Update(message.payload))
259 .unwrap();
260 });
261 Ok(())
262 }
263
264 fn update_contacts(
265 &mut self,
266 message: UpdateContacts,
267 cx: &mut ModelContext<Self>,
268 ) -> Task<Result<()>> {
269 match message {
270 UpdateContacts::Wait(barrier) => {
271 drop(barrier);
272 Task::ready(Ok(()))
273 }
274 UpdateContacts::Clear(barrier) => {
275 self.contacts.clear();
276 self.incoming_contact_requests.clear();
277 self.outgoing_contact_requests.clear();
278 drop(barrier);
279 Task::ready(Ok(()))
280 }
281 UpdateContacts::Update(message) => {
282 let mut user_ids = HashSet::default();
283 for contact in &message.contacts {
284 user_ids.insert(contact.user_id);
285 }
286 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
287 user_ids.extend(message.outgoing_requests.iter());
288
289 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
290 cx.spawn(|this, mut cx| async move {
291 load_users.await?;
292
293 // Users are fetched in parallel above and cached in call to get_users
294 // No need to paralellize here
295 let mut updated_contacts = Vec::new();
296 for contact in message.contacts {
297 let should_notify = contact.should_notify;
298 updated_contacts.push((
299 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
300 should_notify,
301 ));
302 }
303
304 let mut incoming_requests = Vec::new();
305 for request in message.incoming_requests {
306 incoming_requests.push({
307 let user = this
308 .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
309 .await?;
310 (user, request.should_notify)
311 });
312 }
313
314 let mut outgoing_requests = Vec::new();
315 for requested_user_id in message.outgoing_requests {
316 outgoing_requests.push(
317 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
318 .await?,
319 );
320 }
321
322 let removed_contacts =
323 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
324 let removed_incoming_requests =
325 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
326 let removed_outgoing_requests =
327 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
328
329 this.update(&mut cx, |this, cx| {
330 // Remove contacts
331 this.contacts
332 .retain(|contact| !removed_contacts.contains(&contact.user.id));
333 // Update existing contacts and insert new ones
334 for (updated_contact, should_notify) in updated_contacts {
335 if should_notify {
336 cx.emit(Event::Contact {
337 user: updated_contact.user.clone(),
338 kind: ContactEventKind::Accepted,
339 });
340 }
341 match this.contacts.binary_search_by_key(
342 &&updated_contact.user.github_login,
343 |contact| &contact.user.github_login,
344 ) {
345 Ok(ix) => this.contacts[ix] = updated_contact,
346 Err(ix) => this.contacts.insert(ix, updated_contact),
347 }
348 }
349
350 // Remove incoming contact requests
351 this.incoming_contact_requests.retain(|user| {
352 if removed_incoming_requests.contains(&user.id) {
353 cx.emit(Event::Contact {
354 user: user.clone(),
355 kind: ContactEventKind::Cancelled,
356 });
357 false
358 } else {
359 true
360 }
361 });
362 // Update existing incoming requests and insert new ones
363 for (user, should_notify) in incoming_requests {
364 if should_notify {
365 cx.emit(Event::Contact {
366 user: user.clone(),
367 kind: ContactEventKind::Requested,
368 });
369 }
370
371 match this
372 .incoming_contact_requests
373 .binary_search_by_key(&&user.github_login, |contact| {
374 &contact.github_login
375 }) {
376 Ok(ix) => this.incoming_contact_requests[ix] = user,
377 Err(ix) => this.incoming_contact_requests.insert(ix, user),
378 }
379 }
380
381 // Remove outgoing contact requests
382 this.outgoing_contact_requests
383 .retain(|user| !removed_outgoing_requests.contains(&user.id));
384 // Update existing incoming requests and insert new ones
385 for request in outgoing_requests {
386 match this
387 .outgoing_contact_requests
388 .binary_search_by_key(&&request.github_login, |contact| {
389 &contact.github_login
390 }) {
391 Ok(ix) => this.outgoing_contact_requests[ix] = request,
392 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
393 }
394 }
395
396 cx.notify();
397 });
398
399 Ok(())
400 })
401 }
402 }
403 }
404
405 pub fn contacts(&self) -> &[Arc<Contact>] {
406 &self.contacts
407 }
408
409 pub fn has_contact(&self, user: &Arc<User>) -> bool {
410 self.contacts
411 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
412 .is_ok()
413 }
414
415 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
416 &self.incoming_contact_requests
417 }
418
419 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
420 &self.outgoing_contact_requests
421 }
422
423 pub fn is_contact_request_pending(&self, user: &User) -> bool {
424 self.pending_contact_requests.contains_key(&user.id)
425 }
426
427 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
428 if self
429 .contacts
430 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
431 .is_ok()
432 {
433 ContactRequestStatus::RequestAccepted
434 } else if self
435 .outgoing_contact_requests
436 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
437 .is_ok()
438 {
439 ContactRequestStatus::RequestSent
440 } else if self
441 .incoming_contact_requests
442 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
443 .is_ok()
444 {
445 ContactRequestStatus::RequestReceived
446 } else {
447 ContactRequestStatus::None
448 }
449 }
450
451 pub fn request_contact(
452 &mut self,
453 responder_id: u64,
454 cx: &mut ModelContext<Self>,
455 ) -> Task<Result<()>> {
456 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
457 }
458
459 pub fn remove_contact(
460 &mut self,
461 user_id: u64,
462 cx: &mut ModelContext<Self>,
463 ) -> Task<Result<()>> {
464 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
465 }
466
467 pub fn respond_to_contact_request(
468 &mut self,
469 requester_id: u64,
470 accept: bool,
471 cx: &mut ModelContext<Self>,
472 ) -> Task<Result<()>> {
473 self.perform_contact_request(
474 requester_id,
475 proto::RespondToContactRequest {
476 requester_id,
477 response: if accept {
478 proto::ContactRequestResponse::Accept
479 } else {
480 proto::ContactRequestResponse::Decline
481 } as i32,
482 },
483 cx,
484 )
485 }
486
487 pub fn dismiss_contact_request(
488 &mut self,
489 requester_id: u64,
490 cx: &mut ModelContext<Self>,
491 ) -> Task<Result<()>> {
492 let client = self.client.upgrade();
493 cx.spawn_weak(|_, _| async move {
494 client
495 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
496 .request(proto::RespondToContactRequest {
497 requester_id,
498 response: proto::ContactRequestResponse::Dismiss as i32,
499 })
500 .await?;
501 Ok(())
502 })
503 }
504
505 fn perform_contact_request<T: RequestMessage>(
506 &mut self,
507 user_id: u64,
508 request: T,
509 cx: &mut ModelContext<Self>,
510 ) -> Task<Result<()>> {
511 let client = self.client.upgrade();
512 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
513 cx.notify();
514
515 cx.spawn(|this, mut cx| async move {
516 let response = client
517 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
518 .request(request)
519 .await;
520 this.update(&mut cx, |this, cx| {
521 if let Entry::Occupied(mut request_count) =
522 this.pending_contact_requests.entry(user_id)
523 {
524 *request_count.get_mut() -= 1;
525 if *request_count.get() == 0 {
526 request_count.remove();
527 }
528 }
529 cx.notify();
530 });
531 response?;
532 Ok(())
533 })
534 }
535
536 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
537 let (tx, mut rx) = postage::barrier::channel();
538 self.update_contacts_tx
539 .unbounded_send(UpdateContacts::Clear(tx))
540 .unwrap();
541 async move {
542 rx.next().await;
543 }
544 }
545
546 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
547 let (tx, mut rx) = postage::barrier::channel();
548 self.update_contacts_tx
549 .unbounded_send(UpdateContacts::Wait(tx))
550 .unwrap();
551 async move {
552 rx.next().await;
553 }
554 }
555
556 pub fn get_users(
557 &mut self,
558 user_ids: Vec<u64>,
559 cx: &mut ModelContext<Self>,
560 ) -> Task<Result<Vec<Arc<User>>>> {
561 let mut user_ids_to_fetch = user_ids.clone();
562 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
563
564 cx.spawn(|this, mut cx| async move {
565 if !user_ids_to_fetch.is_empty() {
566 this.update(&mut cx, |this, cx| {
567 this.load_users(
568 proto::GetUsers {
569 user_ids: user_ids_to_fetch,
570 },
571 cx,
572 )
573 })
574 .await?;
575 }
576
577 this.read_with(&cx, |this, _| {
578 user_ids
579 .iter()
580 .map(|user_id| {
581 this.users
582 .get(user_id)
583 .cloned()
584 .ok_or_else(|| anyhow!("user {} not found", user_id))
585 })
586 .collect()
587 })
588 })
589 }
590
591 pub fn fuzzy_search_users(
592 &mut self,
593 query: String,
594 cx: &mut ModelContext<Self>,
595 ) -> Task<Result<Vec<Arc<User>>>> {
596 self.load_users(proto::FuzzySearchUsers { query }, cx)
597 }
598
599 pub fn get_user(
600 &mut self,
601 user_id: u64,
602 cx: &mut ModelContext<Self>,
603 ) -> Task<Result<Arc<User>>> {
604 if let Some(user) = self.users.get(&user_id).cloned() {
605 return cx.foreground().spawn(async move { Ok(user) });
606 }
607
608 let load_users = self.get_users(vec![user_id], cx);
609 cx.spawn(|this, mut cx| async move {
610 load_users.await?;
611 this.update(&mut cx, |this, _| {
612 this.users
613 .get(&user_id)
614 .cloned()
615 .ok_or_else(|| anyhow!("server responded with no users"))
616 })
617 })
618 }
619
620 pub fn current_user(&self) -> Option<Arc<User>> {
621 self.current_user.borrow().clone()
622 }
623
624 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
625 self.current_user.clone()
626 }
627
628 fn load_users(
629 &mut self,
630 request: impl RequestMessage<Response = UsersResponse>,
631 cx: &mut ModelContext<Self>,
632 ) -> Task<Result<Vec<Arc<User>>>> {
633 let client = self.client.clone();
634 let http = self.http.clone();
635 cx.spawn_weak(|this, mut cx| async move {
636 if let Some(rpc) = client.upgrade() {
637 let response = rpc.request(request).await.context("error loading users")?;
638 let users = future::join_all(
639 response
640 .users
641 .into_iter()
642 .map(|user| User::new(user, http.as_ref())),
643 )
644 .await;
645
646 if let Some(this) = this.upgrade(&cx) {
647 this.update(&mut cx, |this, _| {
648 for user in &users {
649 this.users.insert(user.id, user.clone());
650 }
651 });
652 }
653 Ok(users)
654 } else {
655 Ok(Vec::new())
656 }
657 })
658 }
659}
660
661impl User {
662 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
663 Arc::new(User {
664 id: message.id,
665 github_login: message.github_login,
666 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
667 })
668 }
669}
670
671impl Contact {
672 async fn from_proto(
673 contact: proto::Contact,
674 user_store: &ModelHandle<UserStore>,
675 cx: &mut AsyncAppContext,
676 ) -> Result<Self> {
677 let user = user_store
678 .update(cx, |user_store, cx| {
679 user_store.get_user(contact.user_id, cx)
680 })
681 .await?;
682 Ok(Self {
683 user,
684 online: contact.online,
685 })
686 }
687}
688
689async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
690 let mut response = http
691 .get(url, Default::default(), true)
692 .await
693 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
694
695 if !response.status().is_success() {
696 return Err(anyhow!("avatar request failed {:?}", response.status()));
697 }
698
699 let mut body = Vec::new();
700 response
701 .body_mut()
702 .read_to_end(&mut body)
703 .await
704 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
705 let format = image::guess_format(&body)?;
706 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
707 Ok(ImageData::new(image))
708}