user.rs

  1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Result};
  3use futures::{future, AsyncReadExt};
  4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  5use postage::{prelude::Stream, sink::Sink, watch};
  6use std::{
  7    collections::{HashMap, HashSet},
  8    sync::{Arc, Weak},
  9};
 10use util::TryFutureExt as _;
 11
 12#[derive(Debug)]
 13pub struct User {
 14    pub id: u64,
 15    pub github_login: String,
 16    pub avatar: Option<Arc<ImageData>>,
 17}
 18
 19#[derive(Debug)]
 20pub struct Contact {
 21    pub user: Arc<User>,
 22    pub projects: Vec<ProjectMetadata>,
 23}
 24
 25#[derive(Debug)]
 26pub struct ProjectMetadata {
 27    pub id: u64,
 28    pub is_shared: bool,
 29    pub worktree_root_names: Vec<String>,
 30    pub guests: Vec<Arc<User>>,
 31}
 32
 33pub struct UserStore {
 34    users: HashMap<u64, Arc<User>>,
 35    update_contacts_tx: watch::Sender<Option<proto::UpdateContacts>>,
 36    current_user: watch::Receiver<Option<Arc<User>>>,
 37    contacts: Arc<[Contact]>,
 38    client: Weak<Client>,
 39    http: Arc<dyn HttpClient>,
 40    _maintain_contacts: Task<()>,
 41    _maintain_current_user: Task<()>,
 42}
 43
 44pub enum Event {}
 45
 46impl Entity for UserStore {
 47    type Event = Event;
 48}
 49
 50impl UserStore {
 51    pub fn new(
 52        client: Arc<Client>,
 53        http: Arc<dyn HttpClient>,
 54        cx: &mut ModelContext<Self>,
 55    ) -> Self {
 56        let (mut current_user_tx, current_user_rx) = watch::channel();
 57        let (update_contacts_tx, mut update_contacts_rx) =
 58            watch::channel::<Option<proto::UpdateContacts>>();
 59        let rpc_subscription =
 60            client.add_message_handler(cx.handle(), Self::handle_update_contacts);
 61        Self {
 62            users: Default::default(),
 63            current_user: current_user_rx,
 64            contacts: Arc::from([]),
 65            client: Arc::downgrade(&client),
 66            update_contacts_tx,
 67            http,
 68            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
 69                let _subscription = rpc_subscription;
 70                while let Some(message) = update_contacts_rx.recv().await {
 71                    if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
 72                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
 73                            .log_err()
 74                            .await;
 75                    }
 76                }
 77            }),
 78            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
 79                let mut status = client.status();
 80                while let Some(status) = status.recv().await {
 81                    match status {
 82                        Status::Connected { .. } => {
 83                            if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
 84                                let user = this
 85                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
 86                                    .log_err()
 87                                    .await;
 88                                current_user_tx.send(user).await.ok();
 89                            }
 90                        }
 91                        Status::SignedOut => {
 92                            current_user_tx.send(None).await.ok();
 93                        }
 94                        _ => {}
 95                    }
 96                }
 97            }),
 98        }
 99    }
100
101    async fn handle_update_contacts(
102        this: ModelHandle<Self>,
103        msg: TypedEnvelope<proto::UpdateContacts>,
104        _: Arc<Client>,
105        mut cx: AsyncAppContext,
106    ) -> Result<()> {
107        this.update(&mut cx, |this, _| {
108            *this.update_contacts_tx.borrow_mut() = Some(msg.payload);
109        });
110        Ok(())
111    }
112
113    fn update_contacts(
114        &mut self,
115        message: proto::UpdateContacts,
116        cx: &mut ModelContext<Self>,
117    ) -> Task<Result<()>> {
118        let mut user_ids = HashSet::new();
119        for contact in &message.contacts {
120            user_ids.insert(contact.user_id);
121            user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
122        }
123
124        let load_users = self.load_users(user_ids.into_iter().collect(), cx);
125        cx.spawn(|this, mut cx| async move {
126            load_users.await?;
127
128            let mut contacts = Vec::new();
129            for contact in message.contacts {
130                contacts.push(Contact::from_proto(contact, &this, &mut cx).await?);
131            }
132
133            this.update(&mut cx, |this, cx| {
134                contacts.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
135                this.contacts = contacts.into();
136                cx.notify();
137            });
138
139            Ok(())
140        })
141    }
142
143    pub fn contacts(&self) -> &Arc<[Contact]> {
144        &self.contacts
145    }
146
147    pub fn load_users(
148        &mut self,
149        mut user_ids: Vec<u64>,
150        cx: &mut ModelContext<Self>,
151    ) -> Task<Result<()>> {
152        let rpc = self.client.clone();
153        let http = self.http.clone();
154        user_ids.retain(|id| !self.users.contains_key(id));
155        cx.spawn_weak(|this, mut cx| async move {
156            if let Some(rpc) = rpc.upgrade() {
157                if !user_ids.is_empty() {
158                    let response = rpc.request(proto::GetUsers { user_ids }).await?;
159                    let new_users = future::join_all(
160                        response
161                            .users
162                            .into_iter()
163                            .map(|user| User::new(user, http.as_ref())),
164                    )
165                    .await;
166
167                    if let Some(this) = this.upgrade(&cx) {
168                        this.update(&mut cx, |this, _| {
169                            for user in new_users {
170                                this.users.insert(user.id, Arc::new(user));
171                            }
172                        });
173                    }
174                }
175            }
176            Ok(())
177        })
178    }
179
180    pub fn fetch_user(
181        &mut self,
182        user_id: u64,
183        cx: &mut ModelContext<Self>,
184    ) -> Task<Result<Arc<User>>> {
185        if let Some(user) = self.users.get(&user_id).cloned() {
186            return cx.foreground().spawn(async move { Ok(user) });
187        }
188
189        let load_users = self.load_users(vec![user_id], cx);
190        cx.spawn(|this, mut cx| async move {
191            load_users.await?;
192            this.update(&mut cx, |this, _| {
193                this.users
194                    .get(&user_id)
195                    .cloned()
196                    .ok_or_else(|| anyhow!("server responded with no users"))
197            })
198        })
199    }
200
201    pub fn current_user(&self) -> Option<Arc<User>> {
202        self.current_user.borrow().clone()
203    }
204
205    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
206        self.current_user.clone()
207    }
208}
209
210impl User {
211    async fn new(message: proto::User, http: &dyn HttpClient) -> Self {
212        User {
213            id: message.id,
214            github_login: message.github_login,
215            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
216        }
217    }
218}
219
220impl Contact {
221    async fn from_proto(
222        contact: proto::Contact,
223        user_store: &ModelHandle<UserStore>,
224        cx: &mut AsyncAppContext,
225    ) -> Result<Self> {
226        let user = user_store
227            .update(cx, |user_store, cx| {
228                user_store.fetch_user(contact.user_id, cx)
229            })
230            .await?;
231        let mut projects = Vec::new();
232        for project in contact.projects {
233            let mut guests = Vec::new();
234            for participant_id in project.guests {
235                guests.push(
236                    user_store
237                        .update(cx, |user_store, cx| {
238                            user_store.fetch_user(participant_id, cx)
239                        })
240                        .await?,
241                );
242            }
243            projects.push(ProjectMetadata {
244                id: project.id,
245                worktree_root_names: project.worktree_root_names.clone(),
246                is_shared: project.is_shared,
247                guests,
248            });
249        }
250        Ok(Self { user, projects })
251    }
252}
253
254async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
255    let mut response = http
256        .get(url, Default::default(), true)
257        .await
258        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
259
260    if !response.status().is_success() {
261        return Err(anyhow!("avatar request failed {:?}", response.status()));
262    }
263
264    let mut body = Vec::new();
265    response
266        .body_mut()
267        .read_to_end(&mut body)
268        .await
269        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
270    let format = image::guess_format(&body)?;
271    let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
272    Ok(ImageData::new(image))
273}