user.rs

  1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
  4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  5use postage::{prelude::Stream, sink::Sink, watch};
  6use rpc::proto::{RequestMessage, UsersResponse};
  7use std::{
  8    collections::{hash_map::Entry, HashMap, HashSet},
  9    sync::{Arc, Weak},
 10};
 11use util::TryFutureExt as _;
 12
 13#[derive(Debug)]
 14pub struct User {
 15    pub id: u64,
 16    pub github_login: String,
 17    pub avatar: Option<Arc<ImageData>>,
 18}
 19
 20#[derive(Debug)]
 21pub struct Contact {
 22    pub user: Arc<User>,
 23    pub online: bool,
 24    pub projects: Vec<ProjectMetadata>,
 25}
 26
 27#[derive(Debug)]
 28pub struct ProjectMetadata {
 29    pub id: u64,
 30    pub is_shared: bool,
 31    pub worktree_root_names: Vec<String>,
 32    pub guests: Vec<Arc<User>>,
 33}
 34
 35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 36pub enum ContactRequestStatus {
 37    None,
 38    Pending,
 39    RequestSent,
 40    RequestReceived,
 41    RequestAccepted,
 42}
 43
 44pub struct UserStore {
 45    users: HashMap<u64, Arc<User>>,
 46    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 47    current_user: watch::Receiver<Option<Arc<User>>>,
 48    contacts: Vec<Arc<Contact>>,
 49    incoming_contact_requests: Vec<Arc<User>>,
 50    outgoing_contact_requests: Vec<Arc<User>>,
 51    pending_contact_requests: HashMap<u64, usize>,
 52    client: Weak<Client>,
 53    http: Arc<dyn HttpClient>,
 54    _maintain_contacts: Task<()>,
 55    _maintain_current_user: Task<()>,
 56}
 57
 58pub enum Event {}
 59
 60impl Entity for UserStore {
 61    type Event = Event;
 62}
 63
 64enum UpdateContacts {
 65    Update(proto::UpdateContacts),
 66    Clear(postage::barrier::Sender),
 67}
 68
 69impl UserStore {
 70    pub fn new(
 71        client: Arc<Client>,
 72        http: Arc<dyn HttpClient>,
 73        cx: &mut ModelContext<Self>,
 74    ) -> Self {
 75        let (mut current_user_tx, current_user_rx) = watch::channel();
 76        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 77        let rpc_subscription =
 78            client.add_message_handler(cx.handle(), Self::handle_update_contacts);
 79        Self {
 80            users: Default::default(),
 81            current_user: current_user_rx,
 82            contacts: Default::default(),
 83            incoming_contact_requests: Default::default(),
 84            outgoing_contact_requests: Default::default(),
 85            client: Arc::downgrade(&client),
 86            update_contacts_tx,
 87            http,
 88            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
 89                let _subscription = rpc_subscription;
 90                while let Some(message) = update_contacts_rx.next().await {
 91                    if let Some(this) = this.upgrade(&cx) {
 92                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
 93                            .log_err()
 94                            .await;
 95                    }
 96                }
 97            }),
 98            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
 99                let mut status = client.status();
100                while let Some(status) = status.recv().await {
101                    match status {
102                        Status::Connected { .. } => {
103                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
104                                let user = this
105                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
106                                    .log_err()
107                                    .await;
108                                current_user_tx.send(user).await.ok();
109                            }
110                        }
111                        Status::SignedOut => {
112                            current_user_tx.send(None).await.ok();
113                            if let Some(this) = this.upgrade(&cx) {
114                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
115                            }
116                        }
117                        Status::ConnectionLost => {
118                            if let Some(this) = this.upgrade(&cx) {
119                                this.update(&mut cx, |this, _| this.clear_contacts()).await;
120                            }
121                        }
122                        _ => {}
123                    }
124                }
125            }),
126            pending_contact_requests: Default::default(),
127        }
128    }
129
130    async fn handle_update_contacts(
131        this: ModelHandle<Self>,
132        msg: TypedEnvelope<proto::UpdateContacts>,
133        _: Arc<Client>,
134        mut cx: AsyncAppContext,
135    ) -> Result<()> {
136        this.update(&mut cx, |this, _| {
137            this.update_contacts_tx
138                .unbounded_send(UpdateContacts::Update(msg.payload))
139                .unwrap();
140        });
141        Ok(())
142    }
143
144    fn update_contacts(
145        &mut self,
146        message: UpdateContacts,
147        cx: &mut ModelContext<Self>,
148    ) -> Task<Result<()>> {
149        match message {
150            UpdateContacts::Clear(barrier) => {
151                self.contacts.clear();
152                self.incoming_contact_requests.clear();
153                self.outgoing_contact_requests.clear();
154                drop(barrier);
155                Task::ready(Ok(()))
156            }
157            UpdateContacts::Update(message) => {
158                log::info!(
159                    "update contacts on client {}: {:?}",
160                    self.client.upgrade().unwrap().id,
161                    message
162                );
163                let mut user_ids = HashSet::new();
164                for contact in &message.contacts {
165                    user_ids.insert(contact.user_id);
166                    user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
167                }
168                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
169                user_ids.extend(message.outgoing_requests.iter());
170
171                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
172                cx.spawn(|this, mut cx| async move {
173                    load_users.await?;
174
175                    // Users are fetched in parallel above and cached in call to get_users
176                    // No need to paralellize here
177                    let mut updated_contacts = Vec::new();
178                    for contact in message.contacts {
179                        updated_contacts.push(Arc::new(
180                            Contact::from_proto(contact, &this, &mut cx).await?,
181                        ));
182                    }
183
184                    let mut incoming_requests = Vec::new();
185                    for request in message.incoming_requests {
186                        incoming_requests.push(
187                            this.update(&mut cx, |this, cx| {
188                                this.fetch_user(request.requester_id, cx)
189                            })
190                            .await?,
191                        );
192                    }
193
194                    let mut outgoing_requests = Vec::new();
195                    for requested_user_id in message.outgoing_requests {
196                        outgoing_requests.push(
197                            this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
198                                .await?,
199                        );
200                    }
201
202                    let removed_contacts =
203                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
204                    let removed_incoming_requests =
205                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
206                    let removed_outgoing_requests =
207                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
208
209                    this.update(&mut cx, |this, cx| {
210                        // Remove contacts
211                        this.contacts
212                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
213                        // Update existing contacts and insert new ones
214                        for updated_contact in updated_contacts {
215                            match this.contacts.binary_search_by_key(
216                                &&updated_contact.user.github_login,
217                                |contact| &contact.user.github_login,
218                            ) {
219                                Ok(ix) => this.contacts[ix] = updated_contact,
220                                Err(ix) => this.contacts.insert(ix, updated_contact),
221                            }
222                        }
223
224                        // Remove incoming contact requests
225                        this.incoming_contact_requests
226                            .retain(|user| !removed_incoming_requests.contains(&user.id));
227                        // Update existing incoming requests and insert new ones
228                        for request in incoming_requests {
229                            match this
230                                .incoming_contact_requests
231                                .binary_search_by_key(&&request.github_login, |contact| {
232                                    &contact.github_login
233                                }) {
234                                Ok(ix) => this.incoming_contact_requests[ix] = request,
235                                Err(ix) => this.incoming_contact_requests.insert(ix, request),
236                            }
237                        }
238
239                        // Remove outgoing contact requests
240                        this.outgoing_contact_requests
241                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
242                        // Update existing incoming requests and insert new ones
243                        for request in outgoing_requests {
244                            match this
245                                .outgoing_contact_requests
246                                .binary_search_by_key(&&request.github_login, |contact| {
247                                    &contact.github_login
248                                }) {
249                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
250                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
251                            }
252                        }
253
254                        cx.notify();
255                    });
256
257                    Ok(())
258                })
259            }
260        }
261    }
262
263    pub fn contacts(&self) -> &[Arc<Contact>] {
264        &self.contacts
265    }
266
267    pub fn has_contact(&self, user: &Arc<User>) -> bool {
268        self.contacts
269            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
270            .is_ok()
271    }
272
273    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
274        &self.incoming_contact_requests
275    }
276
277    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
278        &self.outgoing_contact_requests
279    }
280
281    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
282        if self.pending_contact_requests.contains_key(&user.id) {
283            ContactRequestStatus::Pending
284        } else if self
285            .contacts
286            .binary_search_by_key(&&user.id, |contact| &contact.user.id)
287            .is_ok()
288        {
289            ContactRequestStatus::RequestAccepted
290        } else if self
291            .outgoing_contact_requests
292            .binary_search_by_key(&&user.id, |user| &user.id)
293            .is_ok()
294        {
295            ContactRequestStatus::RequestSent
296        } else if self
297            .incoming_contact_requests
298            .binary_search_by_key(&&user.id, |user| &user.id)
299            .is_ok()
300        {
301            ContactRequestStatus::RequestReceived
302        } else {
303            ContactRequestStatus::None
304        }
305    }
306
307    pub fn request_contact(
308        &mut self,
309        responder_id: u64,
310        cx: &mut ModelContext<Self>,
311    ) -> Task<Result<()>> {
312        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
313    }
314
315    pub fn remove_contact(
316        &mut self,
317        user_id: u64,
318        cx: &mut ModelContext<Self>,
319    ) -> Task<Result<()>> {
320        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
321    }
322
323    pub fn respond_to_contact_request(
324        &mut self,
325        requester_id: u64,
326        accept: bool,
327        cx: &mut ModelContext<Self>,
328    ) -> Task<Result<()>> {
329        self.perform_contact_request(
330            requester_id,
331            proto::RespondToContactRequest {
332                requester_id,
333                response: if accept {
334                    proto::ContactRequestResponse::Accept
335                } else {
336                    proto::ContactRequestResponse::Reject
337                } as i32,
338            },
339            cx,
340        )
341    }
342
343    fn perform_contact_request<T: RequestMessage>(
344        &mut self,
345        user_id: u64,
346        request: T,
347        cx: &mut ModelContext<Self>,
348    ) -> Task<Result<()>> {
349        let client = self.client.upgrade();
350        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
351        cx.notify();
352
353        cx.spawn(|this, mut cx| async move {
354            let response = client
355                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
356                .request(request)
357                .await;
358            this.update(&mut cx, |this, cx| {
359                if let Entry::Occupied(mut request_count) =
360                    this.pending_contact_requests.entry(user_id)
361                {
362                    *request_count.get_mut() -= 1;
363                    if *request_count.get() == 0 {
364                        request_count.remove();
365                    }
366                }
367                cx.notify();
368            });
369            response?;
370            Ok(())
371        })
372    }
373
374    pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
375        let (tx, mut rx) = postage::barrier::channel();
376        self.update_contacts_tx
377            .unbounded_send(UpdateContacts::Clear(tx))
378            .unwrap();
379        async move {
380            rx.recv().await;
381        }
382    }
383
384    pub fn get_users(
385        &mut self,
386        mut user_ids: Vec<u64>,
387        cx: &mut ModelContext<Self>,
388    ) -> Task<Result<()>> {
389        user_ids.retain(|id| !self.users.contains_key(id));
390        if user_ids.is_empty() {
391            Task::ready(Ok(()))
392        } else {
393            let load = self.load_users(proto::GetUsers { user_ids }, cx);
394            cx.foreground().spawn(async move {
395                load.await?;
396                Ok(())
397            })
398        }
399    }
400
401    pub fn fuzzy_search_users(
402        &mut self,
403        query: String,
404        cx: &mut ModelContext<Self>,
405    ) -> Task<Result<Vec<Arc<User>>>> {
406        self.load_users(proto::FuzzySearchUsers { query }, cx)
407    }
408
409    pub fn fetch_user(
410        &mut self,
411        user_id: u64,
412        cx: &mut ModelContext<Self>,
413    ) -> Task<Result<Arc<User>>> {
414        if let Some(user) = self.users.get(&user_id).cloned() {
415            return cx.foreground().spawn(async move { Ok(user) });
416        }
417
418        let load_users = self.get_users(vec![user_id], cx);
419        cx.spawn(|this, mut cx| async move {
420            load_users.await?;
421            this.update(&mut cx, |this, _| {
422                this.users
423                    .get(&user_id)
424                    .cloned()
425                    .ok_or_else(|| anyhow!("server responded with no users"))
426            })
427        })
428    }
429
430    pub fn current_user(&self) -> Option<Arc<User>> {
431        self.current_user.borrow().clone()
432    }
433
434    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
435        self.current_user.clone()
436    }
437
438    fn load_users(
439        &mut self,
440        request: impl RequestMessage<Response = UsersResponse>,
441        cx: &mut ModelContext<Self>,
442    ) -> Task<Result<Vec<Arc<User>>>> {
443        let client = self.client.clone();
444        let http = self.http.clone();
445        cx.spawn_weak(|this, mut cx| async move {
446            if let Some(rpc) = client.upgrade() {
447                let response = rpc.request(request).await.context("error loading users")?;
448                let users = future::join_all(
449                    response
450                        .users
451                        .into_iter()
452                        .map(|user| User::new(user, http.as_ref())),
453                )
454                .await;
455
456                if let Some(this) = this.upgrade(&cx) {
457                    this.update(&mut cx, |this, _| {
458                        for user in &users {
459                            this.users.insert(user.id, user.clone());
460                        }
461                    });
462                }
463                Ok(users)
464            } else {
465                Ok(Vec::new())
466            }
467        })
468    }
469}
470
471impl User {
472    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
473        Arc::new(User {
474            id: message.id,
475            github_login: message.github_login,
476            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
477        })
478    }
479}
480
481impl Contact {
482    async fn from_proto(
483        contact: proto::Contact,
484        user_store: &ModelHandle<UserStore>,
485        cx: &mut AsyncAppContext,
486    ) -> Result<Self> {
487        let user = user_store
488            .update(cx, |user_store, cx| {
489                user_store.fetch_user(contact.user_id, cx)
490            })
491            .await?;
492        let mut projects = Vec::new();
493        for project in contact.projects {
494            let mut guests = Vec::new();
495            for participant_id in project.guests {
496                guests.push(
497                    user_store
498                        .update(cx, |user_store, cx| {
499                            user_store.fetch_user(participant_id, cx)
500                        })
501                        .await?,
502                );
503            }
504            projects.push(ProjectMetadata {
505                id: project.id,
506                worktree_root_names: project.worktree_root_names.clone(),
507                is_shared: project.is_shared,
508                guests,
509            });
510        }
511        Ok(Self {
512            user,
513            online: contact.online,
514            projects,
515        })
516    }
517
518    pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
519        self.projects
520            .iter()
521            .filter(|project| !project.worktree_root_names.is_empty())
522    }
523}
524
525async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
526    let mut response = http
527        .get(url, Default::default(), true)
528        .await
529        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
530
531    if !response.status().is_success() {
532        return Err(anyhow!("avatar request failed {:?}", response.status()));
533    }
534
535    let mut body = Vec::new();
536    response
537        .body_mut()
538        .read_to_end(&mut body)
539        .await
540        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
541    let format = image::guess_format(&body)?;
542    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
543    Ok(ImageData::new(image))
544}