user.rs

  1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
  2use crate::incoming_call::IncomingCall;
  3use anyhow::{anyhow, Context, Result};
  4use collections::{hash_map::Entry, HashMap, HashSet};
  5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  7use postage::{sink::Sink, watch};
  8use rpc::proto::{RequestMessage, UsersResponse};
  9use std::sync::{Arc, Weak};
 10use util::TryFutureExt as _;
 11
 12#[derive(Debug)]
 13pub struct User {
 14    pub id: u64,
 15    pub github_login: String,
 16    pub avatar: Option<Arc<ImageData>>,
 17}
 18
 19impl PartialOrd for User {
 20    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 21        Some(self.cmp(other))
 22    }
 23}
 24
 25impl Ord for User {
 26    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 27        self.github_login.cmp(&other.github_login)
 28    }
 29}
 30
 31impl PartialEq for User {
 32    fn eq(&self, other: &Self) -> bool {
 33        self.id == other.id && self.github_login == other.github_login
 34    }
 35}
 36
 37impl Eq for User {}
 38
 39#[derive(Debug, PartialEq)]
 40pub struct Contact {
 41    pub user: Arc<User>,
 42    pub online: bool,
 43}
 44
 45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 46pub enum ContactRequestStatus {
 47    None,
 48    RequestSent,
 49    RequestReceived,
 50    RequestAccepted,
 51}
 52
 53pub struct UserStore {
 54    users: HashMap<u64, Arc<User>>,
 55    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 56    current_user: watch::Receiver<Option<Arc<User>>>,
 57    contacts: Vec<Arc<Contact>>,
 58    incoming_contact_requests: Vec<Arc<User>>,
 59    outgoing_contact_requests: Vec<Arc<User>>,
 60    pending_contact_requests: HashMap<u64, usize>,
 61    invite_info: Option<InviteInfo>,
 62    incoming_call: (
 63        watch::Sender<Option<IncomingCall>>,
 64        watch::Receiver<Option<IncomingCall>>,
 65    ),
 66    client: Weak<Client>,
 67    http: Arc<dyn HttpClient>,
 68    _maintain_contacts: Task<()>,
 69    _maintain_current_user: Task<()>,
 70}
 71
 72#[derive(Clone)]
 73pub struct InviteInfo {
 74    pub count: u32,
 75    pub url: Arc<str>,
 76}
 77
 78pub enum Event {
 79    Contact {
 80        user: Arc<User>,
 81        kind: ContactEventKind,
 82    },
 83    ShowContacts,
 84}
 85
 86#[derive(Clone, Copy)]
 87pub enum ContactEventKind {
 88    Requested,
 89    Accepted,
 90    Cancelled,
 91}
 92
 93impl Entity for UserStore {
 94    type Event = Event;
 95}
 96
 97enum UpdateContacts {
 98    Update(proto::UpdateContacts),
 99    Wait(postage::barrier::Sender),
100    Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104    pub fn new(
105        client: Arc<Client>,
106        http: Arc<dyn HttpClient>,
107        cx: &mut ModelContext<Self>,
108    ) -> Self {
109        let (mut current_user_tx, current_user_rx) = watch::channel();
110        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111        let rpc_subscriptions = vec![
112            client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113            client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114            client.add_message_handler(cx.handle(), Self::handle_show_contacts),
115            client.add_request_handler(cx.handle(), Self::handle_incoming_call),
116            client.add_message_handler(cx.handle(), Self::handle_cancel_call),
117        ];
118        Self {
119            users: Default::default(),
120            current_user: current_user_rx,
121            contacts: Default::default(),
122            incoming_contact_requests: Default::default(),
123            outgoing_contact_requests: Default::default(),
124            invite_info: None,
125            incoming_call: watch::channel(),
126            client: Arc::downgrade(&client),
127            update_contacts_tx,
128            http,
129            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130                let _subscriptions = rpc_subscriptions;
131                while let Some(message) = update_contacts_rx.next().await {
132                    if let Some(this) = this.upgrade(&cx) {
133                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134                            .log_err()
135                            .await;
136                    }
137                }
138            }),
139            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140                let mut status = client.status();
141                while let Some(status) = status.next().await {
142                    match status {
143                        Status::Connected { .. } => {
144                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145                                let user = this
146                                    .update(&mut cx, |this, cx| this.get_user(user_id, cx))
147                                    .log_err()
148                                    .await;
149                                current_user_tx.send(user).await.ok();
150                            }
151                        }
152                        Status::SignedOut => {
153                            current_user_tx.send(None).await.ok();
154                            if let Some(this) = this.upgrade(&cx) {
155                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
156                            }
157                        }
158                        Status::ConnectionLost => {
159                            if let Some(this) = this.upgrade(&cx) {
160                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
161                            }
162                        }
163                        _ => {}
164                    }
165                }
166            }),
167            pending_contact_requests: Default::default(),
168        }
169    }
170
171    async fn handle_update_invite_info(
172        this: ModelHandle<Self>,
173        message: TypedEnvelope<proto::UpdateInviteInfo>,
174        _: Arc<Client>,
175        mut cx: AsyncAppContext,
176    ) -> Result<()> {
177        this.update(&mut cx, |this, cx| {
178            this.invite_info = Some(InviteInfo {
179                url: Arc::from(message.payload.url),
180                count: message.payload.count,
181            });
182            cx.notify();
183        });
184        Ok(())
185    }
186
187    async fn handle_show_contacts(
188        this: ModelHandle<Self>,
189        _: TypedEnvelope<proto::ShowContacts>,
190        _: Arc<Client>,
191        mut cx: AsyncAppContext,
192    ) -> Result<()> {
193        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
194        Ok(())
195    }
196
197    async fn handle_incoming_call(
198        this: ModelHandle<Self>,
199        envelope: TypedEnvelope<proto::IncomingCall>,
200        _: Arc<Client>,
201        mut cx: AsyncAppContext,
202    ) -> Result<proto::Ack> {
203        let call = IncomingCall {
204            room_id: envelope.payload.room_id,
205            participants: this
206                .update(&mut cx, |this, cx| {
207                    this.get_users(envelope.payload.participant_user_ids, cx)
208                })
209                .await?,
210            caller: this
211                .update(&mut cx, |this, cx| {
212                    this.get_user(envelope.payload.caller_user_id, cx)
213                })
214                .await?,
215        };
216        this.update(&mut cx, |this, _| {
217            *this.incoming_call.0.borrow_mut() = Some(call);
218        });
219
220        Ok(proto::Ack {})
221    }
222
223    async fn handle_cancel_call(
224        this: ModelHandle<Self>,
225        _: TypedEnvelope<proto::CancelCall>,
226        _: Arc<Client>,
227        mut cx: AsyncAppContext,
228    ) -> Result<()> {
229        this.update(&mut cx, |this, _| {
230            *this.incoming_call.0.borrow_mut() = None;
231        });
232        Ok(())
233    }
234
235    pub fn invite_info(&self) -> Option<&InviteInfo> {
236        self.invite_info.as_ref()
237    }
238
239    pub fn incoming_call(&self) -> watch::Receiver<Option<IncomingCall>> {
240        self.incoming_call.1.clone()
241    }
242
243    pub fn decline_call(&mut self) -> Result<()> {
244        if let Some(client) = self.client.upgrade() {
245            client.send(proto::DeclineCall {})?;
246        }
247        Ok(())
248    }
249
250    async fn handle_update_contacts(
251        this: ModelHandle<Self>,
252        message: TypedEnvelope<proto::UpdateContacts>,
253        _: Arc<Client>,
254        mut cx: AsyncAppContext,
255    ) -> Result<()> {
256        this.update(&mut cx, |this, _| {
257            this.update_contacts_tx
258                .unbounded_send(UpdateContacts::Update(message.payload))
259                .unwrap();
260        });
261        Ok(())
262    }
263
264    fn update_contacts(
265        &mut self,
266        message: UpdateContacts,
267        cx: &mut ModelContext<Self>,
268    ) -> Task<Result<()>> {
269        match message {
270            UpdateContacts::Wait(barrier) => {
271                drop(barrier);
272                Task::ready(Ok(()))
273            }
274            UpdateContacts::Clear(barrier) => {
275                self.contacts.clear();
276                self.incoming_contact_requests.clear();
277                self.outgoing_contact_requests.clear();
278                drop(barrier);
279                Task::ready(Ok(()))
280            }
281            UpdateContacts::Update(message) => {
282                let mut user_ids = HashSet::default();
283                for contact in &message.contacts {
284                    user_ids.insert(contact.user_id);
285                }
286                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
287                user_ids.extend(message.outgoing_requests.iter());
288
289                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
290                cx.spawn(|this, mut cx| async move {
291                    load_users.await?;
292
293                    // Users are fetched in parallel above and cached in call to get_users
294                    // No need to paralellize here
295                    let mut updated_contacts = Vec::new();
296                    for contact in message.contacts {
297                        let should_notify = contact.should_notify;
298                        updated_contacts.push((
299                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
300                            should_notify,
301                        ));
302                    }
303
304                    let mut incoming_requests = Vec::new();
305                    for request in message.incoming_requests {
306                        incoming_requests.push({
307                            let user = this
308                                .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
309                                .await?;
310                            (user, request.should_notify)
311                        });
312                    }
313
314                    let mut outgoing_requests = Vec::new();
315                    for requested_user_id in message.outgoing_requests {
316                        outgoing_requests.push(
317                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
318                                .await?,
319                        );
320                    }
321
322                    let removed_contacts =
323                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
324                    let removed_incoming_requests =
325                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
326                    let removed_outgoing_requests =
327                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
328
329                    this.update(&mut cx, |this, cx| {
330                        // Remove contacts
331                        this.contacts
332                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
333                        // Update existing contacts and insert new ones
334                        for (updated_contact, should_notify) in updated_contacts {
335                            if should_notify {
336                                cx.emit(Event::Contact {
337                                    user: updated_contact.user.clone(),
338                                    kind: ContactEventKind::Accepted,
339                                });
340                            }
341                            match this.contacts.binary_search_by_key(
342                                &&updated_contact.user.github_login,
343                                |contact| &contact.user.github_login,
344                            ) {
345                                Ok(ix) => this.contacts[ix] = updated_contact,
346                                Err(ix) => this.contacts.insert(ix, updated_contact),
347                            }
348                        }
349
350                        // Remove incoming contact requests
351                        this.incoming_contact_requests.retain(|user| {
352                            if removed_incoming_requests.contains(&user.id) {
353                                cx.emit(Event::Contact {
354                                    user: user.clone(),
355                                    kind: ContactEventKind::Cancelled,
356                                });
357                                false
358                            } else {
359                                true
360                            }
361                        });
362                        // Update existing incoming requests and insert new ones
363                        for (user, should_notify) in incoming_requests {
364                            if should_notify {
365                                cx.emit(Event::Contact {
366                                    user: user.clone(),
367                                    kind: ContactEventKind::Requested,
368                                });
369                            }
370
371                            match this
372                                .incoming_contact_requests
373                                .binary_search_by_key(&&user.github_login, |contact| {
374                                    &contact.github_login
375                                }) {
376                                Ok(ix) => this.incoming_contact_requests[ix] = user,
377                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
378                            }
379                        }
380
381                        // Remove outgoing contact requests
382                        this.outgoing_contact_requests
383                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
384                        // Update existing incoming requests and insert new ones
385                        for request in outgoing_requests {
386                            match this
387                                .outgoing_contact_requests
388                                .binary_search_by_key(&&request.github_login, |contact| {
389                                    &contact.github_login
390                                }) {
391                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
392                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
393                            }
394                        }
395
396                        cx.notify();
397                    });
398
399                    Ok(())
400                })
401            }
402        }
403    }
404
405    pub fn contacts(&self) -> &[Arc<Contact>] {
406        &self.contacts
407    }
408
409    pub fn has_contact(&self, user: &Arc<User>) -> bool {
410        self.contacts
411            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
412            .is_ok()
413    }
414
415    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
416        &self.incoming_contact_requests
417    }
418
419    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
420        &self.outgoing_contact_requests
421    }
422
423    pub fn is_contact_request_pending(&self, user: &User) -> bool {
424        self.pending_contact_requests.contains_key(&user.id)
425    }
426
427    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
428        if self
429            .contacts
430            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
431            .is_ok()
432        {
433            ContactRequestStatus::RequestAccepted
434        } else if self
435            .outgoing_contact_requests
436            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
437            .is_ok()
438        {
439            ContactRequestStatus::RequestSent
440        } else if self
441            .incoming_contact_requests
442            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
443            .is_ok()
444        {
445            ContactRequestStatus::RequestReceived
446        } else {
447            ContactRequestStatus::None
448        }
449    }
450
451    pub fn request_contact(
452        &mut self,
453        responder_id: u64,
454        cx: &mut ModelContext<Self>,
455    ) -> Task<Result<()>> {
456        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
457    }
458
459    pub fn remove_contact(
460        &mut self,
461        user_id: u64,
462        cx: &mut ModelContext<Self>,
463    ) -> Task<Result<()>> {
464        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
465    }
466
467    pub fn respond_to_contact_request(
468        &mut self,
469        requester_id: u64,
470        accept: bool,
471        cx: &mut ModelContext<Self>,
472    ) -> Task<Result<()>> {
473        self.perform_contact_request(
474            requester_id,
475            proto::RespondToContactRequest {
476                requester_id,
477                response: if accept {
478                    proto::ContactRequestResponse::Accept
479                } else {
480                    proto::ContactRequestResponse::Decline
481                } as i32,
482            },
483            cx,
484        )
485    }
486
487    pub fn dismiss_contact_request(
488        &mut self,
489        requester_id: u64,
490        cx: &mut ModelContext<Self>,
491    ) -> Task<Result<()>> {
492        let client = self.client.upgrade();
493        cx.spawn_weak(|_, _| async move {
494            client
495                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
496                .request(proto::RespondToContactRequest {
497                    requester_id,
498                    response: proto::ContactRequestResponse::Dismiss as i32,
499                })
500                .await?;
501            Ok(())
502        })
503    }
504
505    fn perform_contact_request<T: RequestMessage>(
506        &mut self,
507        user_id: u64,
508        request: T,
509        cx: &mut ModelContext<Self>,
510    ) -> Task<Result<()>> {
511        let client = self.client.upgrade();
512        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
513        cx.notify();
514
515        cx.spawn(|this, mut cx| async move {
516            let response = client
517                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
518                .request(request)
519                .await;
520            this.update(&mut cx, |this, cx| {
521                if let Entry::Occupied(mut request_count) =
522                    this.pending_contact_requests.entry(user_id)
523                {
524                    *request_count.get_mut() -= 1;
525                    if *request_count.get() == 0 {
526                        request_count.remove();
527                    }
528                }
529                cx.notify();
530            });
531            response?;
532            Ok(())
533        })
534    }
535
536    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
537        let (tx, mut rx) = postage::barrier::channel();
538        self.update_contacts_tx
539            .unbounded_send(UpdateContacts::Clear(tx))
540            .unwrap();
541        async move {
542            rx.next().await;
543        }
544    }
545
546    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
547        let (tx, mut rx) = postage::barrier::channel();
548        self.update_contacts_tx
549            .unbounded_send(UpdateContacts::Wait(tx))
550            .unwrap();
551        async move {
552            rx.next().await;
553        }
554    }
555
556    pub fn get_users(
557        &mut self,
558        user_ids: Vec<u64>,
559        cx: &mut ModelContext<Self>,
560    ) -> Task<Result<Vec<Arc<User>>>> {
561        let mut user_ids_to_fetch = user_ids.clone();
562        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
563
564        cx.spawn(|this, mut cx| async move {
565            if !user_ids_to_fetch.is_empty() {
566                this.update(&mut cx, |this, cx| {
567                    this.load_users(
568                        proto::GetUsers {
569                            user_ids: user_ids_to_fetch,
570                        },
571                        cx,
572                    )
573                })
574                .await?;
575            }
576
577            this.read_with(&cx, |this, _| {
578                user_ids
579                    .iter()
580                    .map(|user_id| {
581                        this.users
582                            .get(user_id)
583                            .cloned()
584                            .ok_or_else(|| anyhow!("user {} not found", user_id))
585                    })
586                    .collect()
587            })
588        })
589    }
590
591    pub fn fuzzy_search_users(
592        &mut self,
593        query: String,
594        cx: &mut ModelContext<Self>,
595    ) -> Task<Result<Vec<Arc<User>>>> {
596        self.load_users(proto::FuzzySearchUsers { query }, cx)
597    }
598
599    pub fn get_user(
600        &mut self,
601        user_id: u64,
602        cx: &mut ModelContext<Self>,
603    ) -> Task<Result<Arc<User>>> {
604        if let Some(user) = self.users.get(&user_id).cloned() {
605            return cx.foreground().spawn(async move { Ok(user) });
606        }
607
608        let load_users = self.get_users(vec![user_id], cx);
609        cx.spawn(|this, mut cx| async move {
610            load_users.await?;
611            this.update(&mut cx, |this, _| {
612                this.users
613                    .get(&user_id)
614                    .cloned()
615                    .ok_or_else(|| anyhow!("server responded with no users"))
616            })
617        })
618    }
619
620    pub fn current_user(&self) -> Option<Arc<User>> {
621        self.current_user.borrow().clone()
622    }
623
624    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
625        self.current_user.clone()
626    }
627
628    fn load_users(
629        &mut self,
630        request: impl RequestMessage<Response = UsersResponse>,
631        cx: &mut ModelContext<Self>,
632    ) -> Task<Result<Vec<Arc<User>>>> {
633        let client = self.client.clone();
634        let http = self.http.clone();
635        cx.spawn_weak(|this, mut cx| async move {
636            if let Some(rpc) = client.upgrade() {
637                let response = rpc.request(request).await.context("error loading users")?;
638                let users = future::join_all(
639                    response
640                        .users
641                        .into_iter()
642                        .map(|user| User::new(user, http.as_ref())),
643                )
644                .await;
645
646                if let Some(this) = this.upgrade(&cx) {
647                    this.update(&mut cx, |this, _| {
648                        for user in &users {
649                            this.users.insert(user.id, user.clone());
650                        }
651                    });
652                }
653                Ok(users)
654            } else {
655                Ok(Vec::new())
656            }
657        })
658    }
659}
660
661impl User {
662    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
663        Arc::new(User {
664            id: message.id,
665            github_login: message.github_login,
666            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
667        })
668    }
669}
670
671impl Contact {
672    async fn from_proto(
673        contact: proto::Contact,
674        user_store: &ModelHandle<UserStore>,
675        cx: &mut AsyncAppContext,
676    ) -> Result<Self> {
677        let user = user_store
678            .update(cx, |user_store, cx| {
679                user_store.get_user(contact.user_id, cx)
680            })
681            .await?;
682        Ok(Self {
683            user,
684            online: contact.online,
685        })
686    }
687}
688
689async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
690    let mut response = http
691        .get(url, Default::default(), true)
692        .await
693        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
694
695    if !response.status().is_success() {
696        return Err(anyhow!("avatar request failed {:?}", response.status()));
697    }
698
699    let mut body = Vec::new();
700    response
701        .body_mut()
702        .read_to_end(&mut body)
703        .await
704        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
705    let format = image::guess_format(&body)?;
706    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
707    Ok(ImageData::new(image))
708}