user.rs

  1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  5use postage::{prelude::Stream, sink::Sink, watch};
  6use rpc::proto::{RequestMessage, UsersResponse};
  7use std::{
  8    collections::{hash_map::Entry, HashMap, HashSet},
  9    sync::{Arc, Weak},
 10};
 11use util::TryFutureExt as _;
 12
 13#[derive(Debug)]
 14pub struct User {
 15    pub id: u64,
 16    pub github_login: String,
 17    pub avatar: Option<Arc<ImageData>>,
 18}
 19
 20#[derive(Debug)]
 21pub struct Contact {
 22    pub user: Arc<User>,
 23    pub online: bool,
 24    pub projects: Vec<ProjectMetadata>,
 25}
 26
 27#[derive(Debug)]
 28pub struct ProjectMetadata {
 29    pub id: u64,
 30    pub is_shared: bool,
 31    pub worktree_root_names: Vec<String>,
 32    pub guests: Vec<Arc<User>>,
 33}
 34
 35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 36pub enum ContactRequestStatus {
 37    None,
 38    RequestSent,
 39    RequestReceived,
 40    RequestAccepted,
 41}
 42
 43pub struct UserStore {
 44    users: HashMap<u64, Arc<User>>,
 45    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 46    current_user: watch::Receiver<Option<Arc<User>>>,
 47    contacts: Vec<Arc<Contact>>,
 48    incoming_contact_requests: Vec<Arc<User>>,
 49    outgoing_contact_requests: Vec<Arc<User>>,
 50    pending_contact_requests: HashMap<u64, usize>,
 51    client: Weak<Client>,
 52    http: Arc<dyn HttpClient>,
 53    _maintain_contacts: Task<()>,
 54    _maintain_current_user: Task<()>,
 55}
 56
 57pub enum Event {
 58    NotifyIncomingRequest(Arc<User>),
 59}
 60
 61impl Entity for UserStore {
 62    type Event = Event;
 63}
 64
 65enum UpdateContacts {
 66    Update(proto::UpdateContacts),
 67    Clear(postage::barrier::Sender),
 68}
 69
 70impl UserStore {
 71    pub fn new(
 72        client: Arc<Client>,
 73        http: Arc<dyn HttpClient>,
 74        cx: &mut ModelContext<Self>,
 75    ) -> Self {
 76        let (mut current_user_tx, current_user_rx) = watch::channel();
 77        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 78        let rpc_subscription =
 79            client.add_message_handler(cx.handle(), Self::handle_update_contacts);
 80        Self {
 81            users: Default::default(),
 82            current_user: current_user_rx,
 83            contacts: Default::default(),
 84            incoming_contact_requests: Default::default(),
 85            outgoing_contact_requests: Default::default(),
 86            client: Arc::downgrade(&client),
 87            update_contacts_tx,
 88            http,
 89            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
 90                let _subscription = rpc_subscription;
 91                while let Some(message) = update_contacts_rx.next().await {
 92                    if let Some(this) = this.upgrade(&cx) {
 93                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
 94                            .log_err()
 95                            .await;
 96                    }
 97                }
 98            }),
 99            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
100                let mut status = client.status();
101                while let Some(status) = status.recv().await {
102                    match status {
103                        Status::Connected { .. } => {
104                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
105                                let user = this
106                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
107                                    .log_err()
108                                    .await;
109                                current_user_tx.send(user).await.ok();
110                            }
111                        }
112                        Status::SignedOut => {
113                            current_user_tx.send(None).await.ok();
114                            if let Some(this) = this.upgrade(&cx) {
115                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
116                            }
117                        }
118                        Status::ConnectionLost => {
119                            if let Some(this) = this.upgrade(&cx) {
120                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
121                            }
122                        }
123                        _ => {}
124                    }
125                }
126            }),
127            pending_contact_requests: Default::default(),
128        }
129    }
130
131    async fn handle_update_contacts(
132        this: ModelHandle<Self>,
133        msg: TypedEnvelope<proto::UpdateContacts>,
134        _: Arc<Client>,
135        mut cx: AsyncAppContext,
136    ) -> Result<()> {
137        this.update(&mut cx, |this, _| {
138            this.update_contacts_tx
139                .unbounded_send(UpdateContacts::Update(msg.payload))
140                .unwrap();
141        });
142        Ok(())
143    }
144
145    fn update_contacts(
146        &mut self,
147        message: UpdateContacts,
148        cx: &mut ModelContext<Self>,
149    ) -> Task<Result<()>> {
150        match message {
151            UpdateContacts::Clear(barrier) => {
152                self.contacts.clear();
153                self.incoming_contact_requests.clear();
154                self.outgoing_contact_requests.clear();
155                drop(barrier);
156                Task::ready(Ok(()))
157            }
158            UpdateContacts::Update(message) => {
159                log::info!(
160                    "update contacts on client {}: {:?}",
161                    self.client.upgrade().unwrap().id,
162                    message
163                );
164                let mut user_ids = HashSet::new();
165                for contact in &message.contacts {
166                    user_ids.insert(contact.user_id);
167                    user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
168                }
169                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
170                user_ids.extend(message.outgoing_requests.iter());
171
172                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
173                cx.spawn(|this, mut cx| async move {
174                    load_users.await?;
175
176                    // Users are fetched in parallel above and cached in call to get_users
177                    // No need to paralellize here
178                    let mut updated_contacts = Vec::new();
179                    for contact in message.contacts {
180                        updated_contacts.push(Arc::new(
181                            Contact::from_proto(contact, &this, &mut cx).await?,
182                        ));
183                    }
184
185                    let mut incoming_requests = Vec::new();
186                    for request in message.incoming_requests {
187                        incoming_requests.push({
188                            let user = this
189                                .update(&mut cx, |this, cx| {
190                                    this.fetch_user(request.requester_id, cx)
191                                })
192                                .await?;
193                            (user, request.should_notify)
194                        });
195                    }
196
197                    let mut outgoing_requests = Vec::new();
198                    for requested_user_id in message.outgoing_requests {
199                        outgoing_requests.push(
200                            this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
201                                .await?,
202                        );
203                    }
204
205                    let removed_contacts =
206                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
207                    let removed_incoming_requests =
208                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
209                    let removed_outgoing_requests =
210                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
211
212                    this.update(&mut cx, |this, cx| {
213                        // Remove contacts
214                        this.contacts
215                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
216                        // Update existing contacts and insert new ones
217                        for updated_contact in updated_contacts {
218                            match this.contacts.binary_search_by_key(
219                                &&updated_contact.user.github_login,
220                                |contact| &contact.user.github_login,
221                            ) {
222                                Ok(ix) => this.contacts[ix] = updated_contact,
223                                Err(ix) => this.contacts.insert(ix, updated_contact),
224                            }
225                        }
226
227                        // Remove incoming contact requests
228                        this.incoming_contact_requests
229                            .retain(|user| !removed_incoming_requests.contains(&user.id));
230                        // Update existing incoming requests and insert new ones
231                        for (user, should_notify) in incoming_requests {
232                            if should_notify {
233                                cx.emit(Event::NotifyIncomingRequest(user.clone()));
234                            }
235
236                            match this
237                                .incoming_contact_requests
238                                .binary_search_by_key(&&user.github_login, |contact| {
239                                    &contact.github_login
240                                }) {
241                                Ok(ix) => this.incoming_contact_requests[ix] = user,
242                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
243                            }
244                        }
245
246                        // Remove outgoing contact requests
247                        this.outgoing_contact_requests
248                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
249                        // Update existing incoming requests and insert new ones
250                        for request in outgoing_requests {
251                            match this
252                                .outgoing_contact_requests
253                                .binary_search_by_key(&&request.github_login, |contact| {
254                                    &contact.github_login
255                                }) {
256                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
257                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
258                            }
259                        }
260
261                        cx.notify();
262                    });
263
264                    Ok(())
265                })
266            }
267        }
268    }
269
270    pub fn contacts(&self) -> &[Arc<Contact>] {
271        &self.contacts
272    }
273
274    pub fn has_contact(&self, user: &Arc<User>) -> bool {
275        self.contacts
276            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
277            .is_ok()
278    }
279
280    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
281        &self.incoming_contact_requests
282    }
283
284    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
285        &self.outgoing_contact_requests
286    }
287
288    pub fn is_contact_request_pending(&self, user: &User) -> bool {
289        self.pending_contact_requests.contains_key(&user.id)
290    }
291
292    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
293        if self
294            .contacts
295            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
296            .is_ok()
297        {
298            ContactRequestStatus::RequestAccepted
299        } else if self
300            .outgoing_contact_requests
301            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
302            .is_ok()
303        {
304            ContactRequestStatus::RequestSent
305        } else if self
306            .incoming_contact_requests
307            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
308            .is_ok()
309        {
310            ContactRequestStatus::RequestReceived
311        } else {
312            ContactRequestStatus::None
313        }
314    }
315
316    pub fn request_contact(
317        &mut self,
318        responder_id: u64,
319        cx: &mut ModelContext<Self>,
320    ) -> Task<Result<()>> {
321        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
322    }
323
324    pub fn remove_contact(
325        &mut self,
326        user_id: u64,
327        cx: &mut ModelContext<Self>,
328    ) -> Task<Result<()>> {
329        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
330    }
331
332    pub fn respond_to_contact_request(
333        &mut self,
334        requester_id: u64,
335        accept: bool,
336        cx: &mut ModelContext<Self>,
337    ) -> Task<Result<()>> {
338        self.perform_contact_request(
339            requester_id,
340            proto::RespondToContactRequest {
341                requester_id,
342                response: if accept {
343                    proto::ContactRequestResponse::Accept
344                } else {
345                    proto::ContactRequestResponse::Reject
346                } as i32,
347            },
348            cx,
349        )
350    }
351
352    fn perform_contact_request<T: RequestMessage>(
353        &mut self,
354        user_id: u64,
355        request: T,
356        cx: &mut ModelContext<Self>,
357    ) -> Task<Result<()>> {
358        let client = self.client.upgrade();
359        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
360        cx.notify();
361
362        cx.spawn(|this, mut cx| async move {
363            let response = client
364                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
365                .request(request)
366                .await;
367            this.update(&mut cx, |this, cx| {
368                if let Entry::Occupied(mut request_count) =
369                    this.pending_contact_requests.entry(user_id)
370                {
371                    *request_count.get_mut() -= 1;
372                    if *request_count.get() == 0 {
373                        request_count.remove();
374                    }
375                }
376                cx.notify();
377            });
378            response?;
379            Ok(())
380        })
381    }
382
383    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
384        let (tx, mut rx) = postage::barrier::channel();
385        self.update_contacts_tx
386            .unbounded_send(UpdateContacts::Clear(tx))
387            .unwrap();
388        async move {
389            rx.recv().await;
390        }
391    }
392
393    pub fn get_users(
394        &mut self,
395        mut user_ids: Vec<u64>,
396        cx: &mut ModelContext<Self>,
397    ) -> Task<Result<()>> {
398        user_ids.retain(|id| !self.users.contains_key(id));
399        if user_ids.is_empty() {
400            Task::ready(Ok(()))
401        } else {
402            let load = self.load_users(proto::GetUsers { user_ids }, cx);
403            cx.foreground().spawn(async move {
404                load.await?;
405                Ok(())
406            })
407        }
408    }
409
410    pub fn fuzzy_search_users(
411        &mut self,
412        query: String,
413        cx: &mut ModelContext<Self>,
414    ) -> Task<Result<Vec<Arc<User>>>> {
415        self.load_users(proto::FuzzySearchUsers { query }, cx)
416    }
417
418    pub fn fetch_user(
419        &mut self,
420        user_id: u64,
421        cx: &mut ModelContext<Self>,
422    ) -> Task<Result<Arc<User>>> {
423        if let Some(user) = self.users.get(&user_id).cloned() {
424            return cx.foreground().spawn(async move { Ok(user) });
425        }
426
427        let load_users = self.get_users(vec![user_id], cx);
428        cx.spawn(|this, mut cx| async move {
429            load_users.await?;
430            this.update(&mut cx, |this, _| {
431                this.users
432                    .get(&user_id)
433                    .cloned()
434                    .ok_or_else(|| anyhow!("server responded with no users"))
435            })
436        })
437    }
438
439    pub fn current_user(&self) -> Option<Arc<User>> {
440        self.current_user.borrow().clone()
441    }
442
443    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
444        self.current_user.clone()
445    }
446
447    fn load_users(
448        &mut self,
449        request: impl RequestMessage<Response = UsersResponse>,
450        cx: &mut ModelContext<Self>,
451    ) -> Task<Result<Vec<Arc<User>>>> {
452        let client = self.client.clone();
453        let http = self.http.clone();
454        cx.spawn_weak(|this, mut cx| async move {
455            if let Some(rpc) = client.upgrade() {
456                let response = rpc.request(request).await.context("error loading users")?;
457                let users = future::join_all(
458                    response
459                        .users
460                        .into_iter()
461                        .map(|user| User::new(user, http.as_ref())),
462                )
463                .await;
464
465                if let Some(this) = this.upgrade(&cx) {
466                    this.update(&mut cx, |this, _| {
467                        for user in &users {
468                            this.users.insert(user.id, user.clone());
469                        }
470                    });
471                }
472                Ok(users)
473            } else {
474                Ok(Vec::new())
475            }
476        })
477    }
478}
479
480impl User {
481    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
482        Arc::new(User {
483            id: message.id,
484            github_login: message.github_login,
485            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
486        })
487    }
488}
489
490impl Contact {
491    async fn from_proto(
492        contact: proto::Contact,
493        user_store: &ModelHandle<UserStore>,
494        cx: &mut AsyncAppContext,
495    ) -> Result<Self> {
496        let user = user_store
497            .update(cx, |user_store, cx| {
498                user_store.fetch_user(contact.user_id, cx)
499            })
500            .await?;
501        let mut projects = Vec::new();
502        for project in contact.projects {
503            let mut guests = Vec::new();
504            for participant_id in project.guests {
505                guests.push(
506                    user_store
507                        .update(cx, |user_store, cx| {
508                            user_store.fetch_user(participant_id, cx)
509                        })
510                        .await?,
511                );
512            }
513            projects.push(ProjectMetadata {
514                id: project.id,
515                worktree_root_names: project.worktree_root_names.clone(),
516                is_shared: project.is_shared,
517                guests,
518            });
519        }
520        Ok(Self {
521            user,
522            online: contact.online,
523            projects,
524        })
525    }
526
527    pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
528        self.projects
529            .iter()
530            .filter(|project| !project.worktree_root_names.is_empty())
531    }
532}
533
534async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
535    let mut response = http
536        .get(url, Default::default(), true)
537        .await
538        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
539
540    if !response.status().is_success() {
541        return Err(anyhow!("avatar request failed {:?}", response.status()));
542    }
543
544    let mut body = Vec::new();
545    response
546        .body_mut()
547        .read_to_end(&mut body)
548        .await
549        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
550    let format = image::guess_format(&body)?;
551    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
552    Ok(ImageData::new(image))
553}