user.rs

  1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
  2use crate::incoming_call::IncomingCall;
  3use anyhow::{anyhow, Context, Result};
  4use collections::{hash_map::Entry, HashMap, HashSet};
  5use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  6use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  7use postage::{sink::Sink, watch};
  8use rpc::proto::{RequestMessage, UsersResponse};
  9use std::sync::{Arc, Weak};
 10use util::TryFutureExt as _;
 11
 12#[derive(Default, Debug)]
 13pub struct User {
 14    pub id: u64,
 15    pub github_login: String,
 16    pub avatar: Option<Arc<ImageData>>,
 17}
 18
 19impl PartialOrd for User {
 20    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 21        Some(self.cmp(other))
 22    }
 23}
 24
 25impl Ord for User {
 26    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 27        self.github_login.cmp(&other.github_login)
 28    }
 29}
 30
 31impl PartialEq for User {
 32    fn eq(&self, other: &Self) -> bool {
 33        self.id == other.id && self.github_login == other.github_login
 34    }
 35}
 36
 37impl Eq for User {}
 38
 39#[derive(Debug, PartialEq)]
 40pub struct Contact {
 41    pub user: Arc<User>,
 42    pub online: bool,
 43}
 44
 45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 46pub enum ContactRequestStatus {
 47    None,
 48    RequestSent,
 49    RequestReceived,
 50    RequestAccepted,
 51}
 52
 53pub struct UserStore {
 54    users: HashMap<u64, Arc<User>>,
 55    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 56    current_user: watch::Receiver<Option<Arc<User>>>,
 57    contacts: Vec<Arc<Contact>>,
 58    incoming_contact_requests: Vec<Arc<User>>,
 59    outgoing_contact_requests: Vec<Arc<User>>,
 60    pending_contact_requests: HashMap<u64, usize>,
 61    invite_info: Option<InviteInfo>,
 62    incoming_call: (
 63        watch::Sender<Option<IncomingCall>>,
 64        watch::Receiver<Option<IncomingCall>>,
 65    ),
 66    client: Weak<Client>,
 67    http: Arc<dyn HttpClient>,
 68    _maintain_contacts: Task<()>,
 69    _maintain_current_user: Task<()>,
 70}
 71
 72#[derive(Clone)]
 73pub struct InviteInfo {
 74    pub count: u32,
 75    pub url: Arc<str>,
 76}
 77
 78pub enum Event {
 79    Contact {
 80        user: Arc<User>,
 81        kind: ContactEventKind,
 82    },
 83    ShowContacts,
 84}
 85
 86#[derive(Clone, Copy)]
 87pub enum ContactEventKind {
 88    Requested,
 89    Accepted,
 90    Cancelled,
 91}
 92
 93impl Entity for UserStore {
 94    type Event = Event;
 95}
 96
 97enum UpdateContacts {
 98    Update(proto::UpdateContacts),
 99    Wait(postage::barrier::Sender),
100    Clear(postage::barrier::Sender),
101}
102
103impl UserStore {
104    pub fn new(
105        client: Arc<Client>,
106        http: Arc<dyn HttpClient>,
107        cx: &mut ModelContext<Self>,
108    ) -> Self {
109        let (mut current_user_tx, current_user_rx) = watch::channel();
110        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
111        let rpc_subscriptions = vec![
112            client.add_message_handler(cx.handle(), Self::handle_update_contacts),
113            client.add_message_handler(cx.handle(), Self::handle_update_invite_info),
114            client.add_message_handler(cx.handle(), Self::handle_show_contacts),
115            client.add_request_handler(cx.handle(), Self::handle_incoming_call),
116            client.add_message_handler(cx.handle(), Self::handle_cancel_call),
117        ];
118        Self {
119            users: Default::default(),
120            current_user: current_user_rx,
121            contacts: Default::default(),
122            incoming_contact_requests: Default::default(),
123            outgoing_contact_requests: Default::default(),
124            invite_info: None,
125            incoming_call: watch::channel(),
126            client: Arc::downgrade(&client),
127            update_contacts_tx,
128            http,
129            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
130                let _subscriptions = rpc_subscriptions;
131                while let Some(message) = update_contacts_rx.next().await {
132                    if let Some(this) = this.upgrade(&cx) {
133                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
134                            .log_err()
135                            .await;
136                    }
137                }
138            }),
139            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
140                let mut status = client.status();
141                while let Some(status) = status.next().await {
142                    match status {
143                        Status::Connected { .. } => {
144                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
145                                let user = this
146                                    .update(&mut cx, |this, cx| this.get_user(user_id, cx))
147                                    .log_err()
148                                    .await;
149                                current_user_tx.send(user).await.ok();
150                            }
151                        }
152                        Status::SignedOut => {
153                            current_user_tx.send(None).await.ok();
154                            if let Some(this) = this.upgrade(&cx) {
155                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
156                            }
157                        }
158                        Status::ConnectionLost => {
159                            if let Some(this) = this.upgrade(&cx) {
160                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
161                            }
162                        }
163                        _ => {}
164                    }
165                }
166            }),
167            pending_contact_requests: Default::default(),
168        }
169    }
170
171    async fn handle_update_invite_info(
172        this: ModelHandle<Self>,
173        message: TypedEnvelope<proto::UpdateInviteInfo>,
174        _: Arc<Client>,
175        mut cx: AsyncAppContext,
176    ) -> Result<()> {
177        this.update(&mut cx, |this, cx| {
178            this.invite_info = Some(InviteInfo {
179                url: Arc::from(message.payload.url),
180                count: message.payload.count,
181            });
182            cx.notify();
183        });
184        Ok(())
185    }
186
187    async fn handle_show_contacts(
188        this: ModelHandle<Self>,
189        _: TypedEnvelope<proto::ShowContacts>,
190        _: Arc<Client>,
191        mut cx: AsyncAppContext,
192    ) -> Result<()> {
193        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
194        Ok(())
195    }
196
197    async fn handle_incoming_call(
198        this: ModelHandle<Self>,
199        envelope: TypedEnvelope<proto::IncomingCall>,
200        _: Arc<Client>,
201        mut cx: AsyncAppContext,
202    ) -> Result<proto::Ack> {
203        let call = IncomingCall {
204            room_id: envelope.payload.room_id,
205            participants: this
206                .update(&mut cx, |this, cx| {
207                    this.get_users(envelope.payload.participant_user_ids, cx)
208                })
209                .await?,
210            caller: this
211                .update(&mut cx, |this, cx| {
212                    this.get_user(envelope.payload.caller_user_id, cx)
213                })
214                .await?,
215            initial_project_id: envelope.payload.initial_project_id,
216        };
217        this.update(&mut cx, |this, _| {
218            *this.incoming_call.0.borrow_mut() = Some(call);
219        });
220
221        Ok(proto::Ack {})
222    }
223
224    async fn handle_cancel_call(
225        this: ModelHandle<Self>,
226        _: TypedEnvelope<proto::CancelCall>,
227        _: Arc<Client>,
228        mut cx: AsyncAppContext,
229    ) -> Result<()> {
230        this.update(&mut cx, |this, _| {
231            *this.incoming_call.0.borrow_mut() = None;
232        });
233        Ok(())
234    }
235
236    pub fn invite_info(&self) -> Option<&InviteInfo> {
237        self.invite_info.as_ref()
238    }
239
240    pub fn incoming_call(&self) -> watch::Receiver<Option<IncomingCall>> {
241        self.incoming_call.1.clone()
242    }
243
244    pub fn decline_call(&mut self) -> Result<()> {
245        if let Some(client) = self.client.upgrade() {
246            client.send(proto::DeclineCall {})?;
247        }
248        Ok(())
249    }
250
251    async fn handle_update_contacts(
252        this: ModelHandle<Self>,
253        message: TypedEnvelope<proto::UpdateContacts>,
254        _: Arc<Client>,
255        mut cx: AsyncAppContext,
256    ) -> Result<()> {
257        this.update(&mut cx, |this, _| {
258            this.update_contacts_tx
259                .unbounded_send(UpdateContacts::Update(message.payload))
260                .unwrap();
261        });
262        Ok(())
263    }
264
265    fn update_contacts(
266        &mut self,
267        message: UpdateContacts,
268        cx: &mut ModelContext<Self>,
269    ) -> Task<Result<()>> {
270        match message {
271            UpdateContacts::Wait(barrier) => {
272                drop(barrier);
273                Task::ready(Ok(()))
274            }
275            UpdateContacts::Clear(barrier) => {
276                self.contacts.clear();
277                self.incoming_contact_requests.clear();
278                self.outgoing_contact_requests.clear();
279                drop(barrier);
280                Task::ready(Ok(()))
281            }
282            UpdateContacts::Update(message) => {
283                let mut user_ids = HashSet::default();
284                for contact in &message.contacts {
285                    user_ids.insert(contact.user_id);
286                }
287                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
288                user_ids.extend(message.outgoing_requests.iter());
289
290                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
291                cx.spawn(|this, mut cx| async move {
292                    load_users.await?;
293
294                    // Users are fetched in parallel above and cached in call to get_users
295                    // No need to paralellize here
296                    let mut updated_contacts = Vec::new();
297                    for contact in message.contacts {
298                        let should_notify = contact.should_notify;
299                        updated_contacts.push((
300                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
301                            should_notify,
302                        ));
303                    }
304
305                    let mut incoming_requests = Vec::new();
306                    for request in message.incoming_requests {
307                        incoming_requests.push({
308                            let user = this
309                                .update(&mut cx, |this, cx| this.get_user(request.requester_id, cx))
310                                .await?;
311                            (user, request.should_notify)
312                        });
313                    }
314
315                    let mut outgoing_requests = Vec::new();
316                    for requested_user_id in message.outgoing_requests {
317                        outgoing_requests.push(
318                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))
319                                .await?,
320                        );
321                    }
322
323                    let removed_contacts =
324                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
325                    let removed_incoming_requests =
326                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
327                    let removed_outgoing_requests =
328                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
329
330                    this.update(&mut cx, |this, cx| {
331                        // Remove contacts
332                        this.contacts
333                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
334                        // Update existing contacts and insert new ones
335                        for (updated_contact, should_notify) in updated_contacts {
336                            if should_notify {
337                                cx.emit(Event::Contact {
338                                    user: updated_contact.user.clone(),
339                                    kind: ContactEventKind::Accepted,
340                                });
341                            }
342                            match this.contacts.binary_search_by_key(
343                                &&updated_contact.user.github_login,
344                                |contact| &contact.user.github_login,
345                            ) {
346                                Ok(ix) => this.contacts[ix] = updated_contact,
347                                Err(ix) => this.contacts.insert(ix, updated_contact),
348                            }
349                        }
350
351                        // Remove incoming contact requests
352                        this.incoming_contact_requests.retain(|user| {
353                            if removed_incoming_requests.contains(&user.id) {
354                                cx.emit(Event::Contact {
355                                    user: user.clone(),
356                                    kind: ContactEventKind::Cancelled,
357                                });
358                                false
359                            } else {
360                                true
361                            }
362                        });
363                        // Update existing incoming requests and insert new ones
364                        for (user, should_notify) in incoming_requests {
365                            if should_notify {
366                                cx.emit(Event::Contact {
367                                    user: user.clone(),
368                                    kind: ContactEventKind::Requested,
369                                });
370                            }
371
372                            match this
373                                .incoming_contact_requests
374                                .binary_search_by_key(&&user.github_login, |contact| {
375                                    &contact.github_login
376                                }) {
377                                Ok(ix) => this.incoming_contact_requests[ix] = user,
378                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
379                            }
380                        }
381
382                        // Remove outgoing contact requests
383                        this.outgoing_contact_requests
384                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
385                        // Update existing incoming requests and insert new ones
386                        for request in outgoing_requests {
387                            match this
388                                .outgoing_contact_requests
389                                .binary_search_by_key(&&request.github_login, |contact| {
390                                    &contact.github_login
391                                }) {
392                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
393                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
394                            }
395                        }
396
397                        cx.notify();
398                    });
399
400                    Ok(())
401                })
402            }
403        }
404    }
405
406    pub fn contacts(&self) -> &[Arc<Contact>] {
407        &self.contacts
408    }
409
410    pub fn has_contact(&self, user: &Arc<User>) -> bool {
411        self.contacts
412            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
413            .is_ok()
414    }
415
416    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
417        &self.incoming_contact_requests
418    }
419
420    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
421        &self.outgoing_contact_requests
422    }
423
424    pub fn is_contact_request_pending(&self, user: &User) -> bool {
425        self.pending_contact_requests.contains_key(&user.id)
426    }
427
428    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
429        if self
430            .contacts
431            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
432            .is_ok()
433        {
434            ContactRequestStatus::RequestAccepted
435        } else if self
436            .outgoing_contact_requests
437            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
438            .is_ok()
439        {
440            ContactRequestStatus::RequestSent
441        } else if self
442            .incoming_contact_requests
443            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
444            .is_ok()
445        {
446            ContactRequestStatus::RequestReceived
447        } else {
448            ContactRequestStatus::None
449        }
450    }
451
452    pub fn request_contact(
453        &mut self,
454        responder_id: u64,
455        cx: &mut ModelContext<Self>,
456    ) -> Task<Result<()>> {
457        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
458    }
459
460    pub fn remove_contact(
461        &mut self,
462        user_id: u64,
463        cx: &mut ModelContext<Self>,
464    ) -> Task<Result<()>> {
465        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
466    }
467
468    pub fn respond_to_contact_request(
469        &mut self,
470        requester_id: u64,
471        accept: bool,
472        cx: &mut ModelContext<Self>,
473    ) -> Task<Result<()>> {
474        self.perform_contact_request(
475            requester_id,
476            proto::RespondToContactRequest {
477                requester_id,
478                response: if accept {
479                    proto::ContactRequestResponse::Accept
480                } else {
481                    proto::ContactRequestResponse::Decline
482                } as i32,
483            },
484            cx,
485        )
486    }
487
488    pub fn dismiss_contact_request(
489        &mut self,
490        requester_id: u64,
491        cx: &mut ModelContext<Self>,
492    ) -> Task<Result<()>> {
493        let client = self.client.upgrade();
494        cx.spawn_weak(|_, _| async move {
495            client
496                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
497                .request(proto::RespondToContactRequest {
498                    requester_id,
499                    response: proto::ContactRequestResponse::Dismiss as i32,
500                })
501                .await?;
502            Ok(())
503        })
504    }
505
506    fn perform_contact_request<T: RequestMessage>(
507        &mut self,
508        user_id: u64,
509        request: T,
510        cx: &mut ModelContext<Self>,
511    ) -> Task<Result<()>> {
512        let client = self.client.upgrade();
513        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
514        cx.notify();
515
516        cx.spawn(|this, mut cx| async move {
517            let response = client
518                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
519                .request(request)
520                .await;
521            this.update(&mut cx, |this, cx| {
522                if let Entry::Occupied(mut request_count) =
523                    this.pending_contact_requests.entry(user_id)
524                {
525                    *request_count.get_mut() -= 1;
526                    if *request_count.get() == 0 {
527                        request_count.remove();
528                    }
529                }
530                cx.notify();
531            });
532            response?;
533            Ok(())
534        })
535    }
536
537    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
538        let (tx, mut rx) = postage::barrier::channel();
539        self.update_contacts_tx
540            .unbounded_send(UpdateContacts::Clear(tx))
541            .unwrap();
542        async move {
543            rx.next().await;
544        }
545    }
546
547    pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
548        let (tx, mut rx) = postage::barrier::channel();
549        self.update_contacts_tx
550            .unbounded_send(UpdateContacts::Wait(tx))
551            .unwrap();
552        async move {
553            rx.next().await;
554        }
555    }
556
557    pub fn get_users(
558        &mut self,
559        user_ids: Vec<u64>,
560        cx: &mut ModelContext<Self>,
561    ) -> Task<Result<Vec<Arc<User>>>> {
562        let mut user_ids_to_fetch = user_ids.clone();
563        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
564
565        cx.spawn(|this, mut cx| async move {
566            if !user_ids_to_fetch.is_empty() {
567                this.update(&mut cx, |this, cx| {
568                    this.load_users(
569                        proto::GetUsers {
570                            user_ids: user_ids_to_fetch,
571                        },
572                        cx,
573                    )
574                })
575                .await?;
576            }
577
578            this.read_with(&cx, |this, _| {
579                user_ids
580                    .iter()
581                    .map(|user_id| {
582                        this.users
583                            .get(user_id)
584                            .cloned()
585                            .ok_or_else(|| anyhow!("user {} not found", user_id))
586                    })
587                    .collect()
588            })
589        })
590    }
591
592    pub fn fuzzy_search_users(
593        &mut self,
594        query: String,
595        cx: &mut ModelContext<Self>,
596    ) -> Task<Result<Vec<Arc<User>>>> {
597        self.load_users(proto::FuzzySearchUsers { query }, cx)
598    }
599
600    pub fn get_user(
601        &mut self,
602        user_id: u64,
603        cx: &mut ModelContext<Self>,
604    ) -> Task<Result<Arc<User>>> {
605        if let Some(user) = self.users.get(&user_id).cloned() {
606            return cx.foreground().spawn(async move { Ok(user) });
607        }
608
609        let load_users = self.get_users(vec![user_id], cx);
610        cx.spawn(|this, mut cx| async move {
611            load_users.await?;
612            this.update(&mut cx, |this, _| {
613                this.users
614                    .get(&user_id)
615                    .cloned()
616                    .ok_or_else(|| anyhow!("server responded with no users"))
617            })
618        })
619    }
620
621    pub fn current_user(&self) -> Option<Arc<User>> {
622        self.current_user.borrow().clone()
623    }
624
625    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
626        self.current_user.clone()
627    }
628
629    fn load_users(
630        &mut self,
631        request: impl RequestMessage<Response = UsersResponse>,
632        cx: &mut ModelContext<Self>,
633    ) -> Task<Result<Vec<Arc<User>>>> {
634        let client = self.client.clone();
635        let http = self.http.clone();
636        cx.spawn_weak(|this, mut cx| async move {
637            if let Some(rpc) = client.upgrade() {
638                let response = rpc.request(request).await.context("error loading users")?;
639                let users = future::join_all(
640                    response
641                        .users
642                        .into_iter()
643                        .map(|user| User::new(user, http.as_ref())),
644                )
645                .await;
646
647                if let Some(this) = this.upgrade(&cx) {
648                    this.update(&mut cx, |this, _| {
649                        for user in &users {
650                            this.users.insert(user.id, user.clone());
651                        }
652                    });
653                }
654                Ok(users)
655            } else {
656                Ok(Vec::new())
657            }
658        })
659    }
660}
661
662impl User {
663    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
664        Arc::new(User {
665            id: message.id,
666            github_login: message.github_login,
667            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
668        })
669    }
670}
671
672impl Contact {
673    async fn from_proto(
674        contact: proto::Contact,
675        user_store: &ModelHandle<UserStore>,
676        cx: &mut AsyncAppContext,
677    ) -> Result<Self> {
678        let user = user_store
679            .update(cx, |user_store, cx| {
680                user_store.get_user(contact.user_id, cx)
681            })
682            .await?;
683        Ok(Self {
684            user,
685            online: contact.online,
686        })
687    }
688}
689
690async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
691    let mut response = http
692        .get(url, Default::default(), true)
693        .await
694        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
695
696    if !response.status().is_success() {
697        return Err(anyhow!("avatar request failed {:?}", response.status()));
698    }
699
700    let mut body = Vec::new();
701    response
702        .body_mut()
703        .read_to_end(&mut body)
704        .await
705        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
706    let format = image::guess_format(&body)?;
707    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
708    Ok(ImageData::new(image))
709}