user.rs

  1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use futures::{future, AsyncReadExt, Future};
  4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  5use postage::{prelude::Stream, sink::Sink, watch};
  6use rpc::proto::{RequestMessage, UsersResponse};
  7use std::{
  8    collections::{hash_map::Entry, HashMap, HashSet},
  9    sync::{Arc, Weak},
 10};
 11use util::TryFutureExt as _;
 12
 13#[derive(Debug)]
 14pub struct User {
 15    pub id: u64,
 16    pub github_login: String,
 17    pub avatar: Option<Arc<ImageData>>,
 18}
 19
 20#[derive(Debug)]
 21pub struct Contact {
 22    pub user: Arc<User>,
 23    pub projects: Vec<ProjectMetadata>,
 24}
 25
 26#[derive(Debug)]
 27pub struct ProjectMetadata {
 28    pub id: u64,
 29    pub is_shared: bool,
 30    pub worktree_root_names: Vec<String>,
 31    pub guests: Vec<Arc<User>>,
 32}
 33
 34#[derive(Debug, Clone, Copy)]
 35pub enum ContactRequestStatus {
 36    None,
 37    SendingRequest,
 38    Requested,
 39    RequestAccepted,
 40}
 41
 42pub struct UserStore {
 43    users: HashMap<u64, Arc<User>>,
 44    update_contacts_tx: watch::Sender<Option<proto::UpdateContacts>>,
 45    current_user: watch::Receiver<Option<Arc<User>>>,
 46    contacts: Vec<Arc<Contact>>,
 47    incoming_contact_requests: Vec<Arc<User>>,
 48    outgoing_contact_requests: Vec<Arc<User>>,
 49    pending_contact_requests: HashMap<u64, usize>,
 50    client: Weak<Client>,
 51    http: Arc<dyn HttpClient>,
 52    _maintain_contacts: Task<()>,
 53    _maintain_current_user: Task<()>,
 54}
 55
 56pub enum Event {}
 57
 58impl Entity for UserStore {
 59    type Event = Event;
 60}
 61
 62impl UserStore {
 63    pub fn new(
 64        client: Arc<Client>,
 65        http: Arc<dyn HttpClient>,
 66        cx: &mut ModelContext<Self>,
 67    ) -> Self {
 68        let (mut current_user_tx, current_user_rx) = watch::channel();
 69        let (update_contacts_tx, mut update_contacts_rx) =
 70            watch::channel::<Option<proto::UpdateContacts>>();
 71        let rpc_subscription =
 72            client.add_message_handler(cx.handle(), Self::handle_update_contacts);
 73        Self {
 74            users: Default::default(),
 75            current_user: current_user_rx,
 76            contacts: Default::default(),
 77            incoming_contact_requests: Default::default(),
 78            outgoing_contact_requests: Default::default(),
 79            client: Arc::downgrade(&client),
 80            update_contacts_tx,
 81            http,
 82            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
 83                let _subscription = rpc_subscription;
 84                while let Some(message) = update_contacts_rx.recv().await {
 85                    if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
 86                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
 87                            .log_err()
 88                            .await;
 89                    }
 90                }
 91            }),
 92            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
 93                let mut status = client.status();
 94                while let Some(status) = status.recv().await {
 95                    match status {
 96                        Status::Connected { .. } => {
 97                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
 98                                let user = this
 99                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
100                                    .log_err()
101                                    .await;
102                                current_user_tx.send(user).await.ok();
103                            }
104                        }
105                        Status::SignedOut => {
106                            current_user_tx.send(None).await.ok();
107                        }
108                        _ => {}
109                    }
110                }
111            }),
112            pending_contact_requests: Default::default(),
113        }
114    }
115
116    async fn handle_update_contacts(
117        this: ModelHandle<Self>,
118        msg: TypedEnvelope<proto::UpdateContacts>,
119        _: Arc<Client>,
120        mut cx: AsyncAppContext,
121    ) -> Result<()> {
122        this.update(&mut cx, |this, _| {
123            *this.update_contacts_tx.borrow_mut() = Some(msg.payload);
124        });
125        Ok(())
126    }
127
128    fn update_contacts(
129        &mut self,
130        message: proto::UpdateContacts,
131        cx: &mut ModelContext<Self>,
132    ) -> Task<Result<()>> {
133        log::info!("update contacts on client {:?}", message);
134        let mut user_ids = HashSet::new();
135        for contact in &message.contacts {
136            user_ids.insert(contact.user_id);
137            user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
138        }
139        user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
140        user_ids.extend(message.outgoing_requests.iter());
141
142        let load_users = self.get_users(user_ids.into_iter().collect(), cx);
143        cx.spawn(|this, mut cx| async move {
144            load_users.await?;
145
146            // Users are fetched in parallel above and cached in call to get_users
147            // No need to paralellize here
148            let mut updated_contacts = Vec::new();
149            for contact in message.contacts {
150                updated_contacts.push(Arc::new(
151                    Contact::from_proto(contact, &this, &mut cx).await?,
152                ));
153            }
154
155            let mut incoming_requests = Vec::new();
156            for request in message.incoming_requests {
157                incoming_requests.push(
158                    this.update(&mut cx, |this, cx| {
159                        this.fetch_user(request.requester_id, cx)
160                    })
161                    .await?,
162                );
163            }
164
165            let mut outgoing_requests = Vec::new();
166            for requested_user_id in message.outgoing_requests {
167                outgoing_requests.push(
168                    this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
169                        .await?,
170                );
171            }
172
173            let removed_contacts =
174                HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
175            let removed_incoming_requests =
176                HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
177            let removed_outgoing_requests =
178                HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
179
180            this.update(&mut cx, |this, cx| {
181                // Remove contacts
182                this.contacts
183                    .retain(|contact| !removed_contacts.contains(&contact.user.id));
184                // Update existing contacts and insert new ones
185                for updated_contact in updated_contacts {
186                    match this
187                        .contacts
188                        .binary_search_by_key(&&updated_contact.user.github_login, |contact| {
189                            &contact.user.github_login
190                        }) {
191                        Ok(ix) => this.contacts[ix] = updated_contact,
192                        Err(ix) => this.contacts.insert(ix, updated_contact),
193                    }
194                }
195                cx.notify();
196
197                // Remove incoming contact requests
198                this.incoming_contact_requests
199                    .retain(|user| !removed_incoming_requests.contains(&user.id));
200                // Update existing incoming requests and insert new ones
201                for request in incoming_requests {
202                    match this
203                        .incoming_contact_requests
204                        .binary_search_by_key(&&request.github_login, |contact| {
205                            &contact.github_login
206                        }) {
207                        Ok(ix) => this.incoming_contact_requests[ix] = request,
208                        Err(ix) => this.incoming_contact_requests.insert(ix, request),
209                    }
210                }
211
212                // Remove outgoing contact requests
213                this.outgoing_contact_requests
214                    .retain(|user| !removed_outgoing_requests.contains(&user.id));
215                // Update existing incoming requests and insert new ones
216                for request in outgoing_requests {
217                    match this
218                        .outgoing_contact_requests
219                        .binary_search_by_key(&&request.github_login, |contact| {
220                            &contact.github_login
221                        }) {
222                        Ok(ix) => this.outgoing_contact_requests[ix] = request,
223                        Err(ix) => this.outgoing_contact_requests.insert(ix, request),
224                    }
225                }
226            });
227
228            Ok(())
229        })
230    }
231
232    pub fn contacts(&self) -> &[Arc<Contact>] {
233        &self.contacts
234    }
235
236    pub fn has_contact(&self, user: &Arc<User>) -> bool {
237        self.contacts
238            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
239            .is_ok()
240    }
241
242    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
243        &self.incoming_contact_requests
244    }
245
246    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
247        &self.outgoing_contact_requests
248    }
249
250    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
251        if self
252            .contacts
253            .binary_search_by_key(&&user.id, |contact| &contact.user.id)
254            .is_ok()
255        {
256            ContactRequestStatus::RequestAccepted
257        } else if self
258            .outgoing_contact_requests
259            .binary_search_by_key(&&user.id, |user| &user.id)
260            .is_ok()
261        {
262            ContactRequestStatus::Requested
263        } else if self.pending_contact_requests.contains_key(&user.id) {
264            ContactRequestStatus::SendingRequest
265        } else {
266            ContactRequestStatus::None
267        }
268    }
269
270    pub fn request_contact(
271        &mut self,
272        responder_id: u64,
273        cx: &mut ModelContext<Self>,
274    ) -> Task<Result<()>> {
275        let client = self.client.upgrade();
276        *self
277            .pending_contact_requests
278            .entry(responder_id)
279            .or_insert(0) += 1;
280        cx.notify();
281
282        cx.spawn(|this, mut cx| async move {
283            let request = client
284                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
285                .request(proto::RequestContact { responder_id });
286            request.await?;
287            this.update(&mut cx, |this, cx| {
288                if let Entry::Occupied(mut request_count) =
289                    this.pending_contact_requests.entry(responder_id)
290                {
291                    *request_count.get_mut() -= 1;
292                    if *request_count.get() == 0 {
293                        request_count.remove();
294                    }
295                }
296                cx.notify();
297            });
298            Ok(())
299        })
300    }
301
302    pub fn remove_contact(
303        &mut self,
304        user_id: u64,
305        cx: &mut ModelContext<Self>,
306    ) -> Task<Result<()>> {
307        let client = self.client.upgrade();
308        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
309        cx.notify();
310
311        cx.spawn(|this, mut cx| async move {
312            let request = client
313                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
314                .request(proto::RemoveContact { user_id });
315            request.await?;
316            this.update(&mut cx, |this, cx| {
317                if let Entry::Occupied(mut request_count) =
318                    this.pending_contact_requests.entry(user_id)
319                {
320                    *request_count.get_mut() -= 1;
321                    if *request_count.get() == 0 {
322                        request_count.remove();
323                    }
324                }
325                cx.notify();
326            });
327            Ok(())
328        })
329    }
330
331    pub fn respond_to_contact_request(
332        &self,
333        requester_id: u64,
334        accept: bool,
335    ) -> impl Future<Output = Result<()>> {
336        let client = self.client.upgrade();
337        async move {
338            client
339                .ok_or_else(|| anyhow!("not logged in"))?
340                .request(proto::RespondToContactRequest {
341                    requester_id,
342                    response: if accept {
343                        proto::ContactRequestResponse::Accept
344                    } else {
345                        proto::ContactRequestResponse::Reject
346                    } as i32,
347                })
348                .await?;
349            Ok(())
350        }
351    }
352
353    #[cfg(any(test, feature = "test-support"))]
354    pub fn clear_contacts(&mut self) {
355        self.contacts.clear();
356        self.incoming_contact_requests.clear();
357        self.outgoing_contact_requests.clear();
358    }
359
360    pub fn get_users(
361        &mut self,
362        mut user_ids: Vec<u64>,
363        cx: &mut ModelContext<Self>,
364    ) -> Task<Result<Vec<Arc<User>>>> {
365        user_ids.retain(|id| !self.users.contains_key(id));
366        self.load_users(proto::GetUsers { user_ids }, cx)
367    }
368
369    pub fn fuzzy_search_users(
370        &mut self,
371        query: String,
372        cx: &mut ModelContext<Self>,
373    ) -> Task<Result<Vec<Arc<User>>>> {
374        self.load_users(proto::FuzzySearchUsers { query }, cx)
375    }
376
377    pub fn fetch_user(
378        &mut self,
379        user_id: u64,
380        cx: &mut ModelContext<Self>,
381    ) -> Task<Result<Arc<User>>> {
382        if let Some(user) = self.users.get(&user_id).cloned() {
383            return cx.foreground().spawn(async move { Ok(user) });
384        }
385
386        let load_users = self.get_users(vec![user_id], cx);
387        cx.spawn(|this, mut cx| async move {
388            load_users.await?;
389            this.update(&mut cx, |this, _| {
390                this.users
391                    .get(&user_id)
392                    .cloned()
393                    .ok_or_else(|| anyhow!("server responded with no users"))
394            })
395        })
396    }
397
398    pub fn current_user(&self) -> Option<Arc<User>> {
399        self.current_user.borrow().clone()
400    }
401
402    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
403        self.current_user.clone()
404    }
405
406    fn load_users(
407        &mut self,
408        request: impl RequestMessage<Response = UsersResponse>,
409        cx: &mut ModelContext<Self>,
410    ) -> Task<Result<Vec<Arc<User>>>> {
411        let client = self.client.clone();
412        let http = self.http.clone();
413        cx.spawn_weak(|this, mut cx| async move {
414            if let Some(rpc) = client.upgrade() {
415                let response = rpc.request(request).await.context("error loading users")?;
416                let users = future::join_all(
417                    response
418                        .users
419                        .into_iter()
420                        .map(|user| User::new(user, http.as_ref())),
421                )
422                .await;
423
424                if let Some(this) = this.upgrade(&cx) {
425                    this.update(&mut cx, |this, _| {
426                        for user in &users {
427                            this.users.insert(user.id, user.clone());
428                        }
429                    });
430                }
431                Ok(users)
432            } else {
433                Ok(Vec::new())
434            }
435        })
436    }
437}
438
439impl User {
440    async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
441        Arc::new(User {
442            id: message.id,
443            github_login: message.github_login,
444            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
445        })
446    }
447}
448
449impl Contact {
450    async fn from_proto(
451        contact: proto::Contact,
452        user_store: &ModelHandle<UserStore>,
453        cx: &mut AsyncAppContext,
454    ) -> Result<Self> {
455        let user = user_store
456            .update(cx, |user_store, cx| {
457                user_store.fetch_user(contact.user_id, cx)
458            })
459            .await?;
460        let mut projects = Vec::new();
461        for project in contact.projects {
462            let mut guests = Vec::new();
463            for participant_id in project.guests {
464                guests.push(
465                    user_store
466                        .update(cx, |user_store, cx| {
467                            user_store.fetch_user(participant_id, cx)
468                        })
469                        .await?,
470                );
471            }
472            projects.push(ProjectMetadata {
473                id: project.id,
474                worktree_root_names: project.worktree_root_names.clone(),
475                is_shared: project.is_shared,
476                guests,
477            });
478        }
479        Ok(Self { user, projects })
480    }
481}
482
483async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
484    let mut response = http
485        .get(url, Default::default(), true)
486        .await
487        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
488
489    if !response.status().is_success() {
490        return Err(anyhow!("avatar request failed {:?}", response.status()));
491    }
492
493    let mut body = Vec::new();
494    response
495        .body_mut()
496        .read_to_end(&mut body)
497        .await
498        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
499    let format = image::guess_format(&body)?;
500    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
501    Ok(ImageData::new(image))
502}