user.rs

  1use super::{
  2    http::{HttpClient, Method, Request, Url},
  3    proto, Client, Status, TypedEnvelope,
  4};
  5use anyhow::{anyhow, Context, Result};
  6use futures::future;
  7use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  8use postage::{prelude::Stream, sink::Sink, watch};
  9use std::{
 10    collections::{HashMap, HashSet},
 11    sync::Arc,
 12};
 13use util::TryFutureExt as _;
 14
 15#[derive(Debug)]
 16pub struct User {
 17    pub id: u64,
 18    pub github_login: String,
 19    pub avatar: Option<Arc<ImageData>>,
 20}
 21
 22#[derive(Debug)]
 23pub struct Contact {
 24    pub user: Arc<User>,
 25    pub worktrees: Vec<WorktreeMetadata>,
 26}
 27
 28#[derive(Debug)]
 29pub struct WorktreeMetadata {
 30    pub id: u64,
 31    pub root_name: String,
 32    pub is_shared: bool,
 33    pub guests: Vec<Arc<User>>,
 34}
 35
 36pub struct UserStore {
 37    users: HashMap<u64, Arc<User>>,
 38    current_user: watch::Receiver<Option<Arc<User>>>,
 39    contacts: Arc<[Contact]>,
 40    rpc: Arc<Client>,
 41    http: Arc<dyn HttpClient>,
 42    _maintain_contacts: Task<()>,
 43    _maintain_current_user: Task<()>,
 44}
 45
 46pub enum Event {}
 47
 48impl Entity for UserStore {
 49    type Event = Event;
 50}
 51
 52impl UserStore {
 53    pub fn new(rpc: Arc<Client>, http: Arc<dyn HttpClient>, cx: &mut ModelContext<Self>) -> Self {
 54        let (mut current_user_tx, current_user_rx) = watch::channel();
 55        let (mut update_contacts_tx, mut update_contacts_rx) =
 56            watch::channel::<Option<proto::UpdateContacts>>();
 57        let update_contacts_subscription = rpc.subscribe(
 58            cx,
 59            move |_: &mut Self, msg: TypedEnvelope<proto::UpdateContacts>, _, _| {
 60                let _ = update_contacts_tx.blocking_send(Some(msg.payload));
 61                Ok(())
 62            },
 63        );
 64        Self {
 65            users: Default::default(),
 66            current_user: current_user_rx,
 67            contacts: Arc::from([]),
 68            rpc: rpc.clone(),
 69            http,
 70            _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
 71                let _subscription = update_contacts_subscription;
 72                while let Some(message) = update_contacts_rx.recv().await {
 73                    if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
 74                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
 75                            .log_err()
 76                            .await;
 77                    }
 78                }
 79            }),
 80            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
 81                let mut status = rpc.status();
 82                while let Some(status) = status.recv().await {
 83                    match status {
 84                        Status::Connected { .. } => {
 85                            if let Some((this, user_id)) = this.upgrade(&cx).zip(rpc.user_id()) {
 86                                let user = this
 87                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
 88                                    .log_err()
 89                                    .await;
 90                                current_user_tx.send(user).await.ok();
 91                            }
 92                        }
 93                        Status::SignedOut => {
 94                            current_user_tx.send(None).await.ok();
 95                        }
 96                        _ => {}
 97                    }
 98                }
 99            }),
100        }
101    }
102
103    fn update_contacts(
104        &mut self,
105        message: proto::UpdateContacts,
106        cx: &mut ModelContext<Self>,
107    ) -> Task<Result<()>> {
108        let mut user_ids = HashSet::new();
109        for contact in &message.contacts {
110            user_ids.insert(contact.user_id);
111            user_ids.extend(contact.worktrees.iter().flat_map(|w| &w.guests).copied());
112        }
113
114        let load_users = self.load_users(user_ids.into_iter().collect(), cx);
115        cx.spawn(|this, mut cx| async move {
116            load_users.await?;
117
118            let mut contacts = Vec::new();
119            for contact in message.contacts {
120                contacts.push(Contact::from_proto(contact, &this, &mut cx).await?);
121            }
122
123            this.update(&mut cx, |this, cx| {
124                contacts.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
125                this.contacts = contacts.into();
126                cx.notify();
127            });
128
129            Ok(())
130        })
131    }
132
133    pub fn contacts(&self) -> &Arc<[Contact]> {
134        &self.contacts
135    }
136
137    pub fn load_users(
138        &mut self,
139        mut user_ids: Vec<u64>,
140        cx: &mut ModelContext<Self>,
141    ) -> Task<Result<()>> {
142        let rpc = self.rpc.clone();
143        let http = self.http.clone();
144        user_ids.retain(|id| !self.users.contains_key(id));
145        cx.spawn_weak(|this, mut cx| async move {
146            if !user_ids.is_empty() {
147                let response = rpc.request(proto::GetUsers { user_ids }).await?;
148                let new_users = future::join_all(
149                    response
150                        .users
151                        .into_iter()
152                        .map(|user| User::new(user, http.as_ref())),
153                )
154                .await;
155
156                if let Some(this) = this.upgrade(&cx) {
157                    this.update(&mut cx, |this, _| {
158                        for user in new_users {
159                            this.users.insert(user.id, Arc::new(user));
160                        }
161                    });
162                }
163            }
164
165            Ok(())
166        })
167    }
168
169    pub fn fetch_user(
170        &mut self,
171        user_id: u64,
172        cx: &mut ModelContext<Self>,
173    ) -> Task<Result<Arc<User>>> {
174        if let Some(user) = self.users.get(&user_id).cloned() {
175            return cx.spawn_weak(|_, _| async move { Ok(user) });
176        }
177
178        let load_users = self.load_users(vec![user_id], cx);
179        cx.spawn(|this, mut cx| async move {
180            load_users.await?;
181            this.update(&mut cx, |this, _| {
182                this.users
183                    .get(&user_id)
184                    .cloned()
185                    .ok_or_else(|| anyhow!("server responded with no users"))
186            })
187        })
188    }
189
190    pub fn current_user(&self) -> Option<Arc<User>> {
191        self.current_user.borrow().clone()
192    }
193
194    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
195        self.current_user.clone()
196    }
197}
198
199impl User {
200    async fn new(message: proto::User, http: &dyn HttpClient) -> Self {
201        User {
202            id: message.id,
203            github_login: message.github_login,
204            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
205        }
206    }
207}
208
209impl Contact {
210    async fn from_proto(
211        contact: proto::Contact,
212        user_store: &ModelHandle<UserStore>,
213        cx: &mut AsyncAppContext,
214    ) -> Result<Self> {
215        let user = user_store
216            .update(cx, |user_store, cx| {
217                user_store.fetch_user(contact.user_id, cx)
218            })
219            .await?;
220        let mut worktrees = Vec::new();
221        for worktree in contact.worktrees {
222            let mut guests = Vec::new();
223            for participant_id in worktree.guests {
224                guests.push(
225                    user_store
226                        .update(cx, |user_store, cx| {
227                            user_store.fetch_user(participant_id, cx)
228                        })
229                        .await?,
230                );
231            }
232            worktrees.push(WorktreeMetadata {
233                id: worktree.id,
234                root_name: worktree.root_name,
235                is_shared: worktree.is_shared,
236                guests,
237            });
238        }
239        Ok(Self { user, worktrees })
240    }
241}
242
243async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
244    let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
245    let mut request = Request::new(Method::Get, url);
246    request.middleware(surf::middleware::Redirect::default());
247
248    let mut response = http
249        .send(request)
250        .await
251        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
252    if !response.status().is_success() {
253        return Err(anyhow!("avatar request failed {:?}", response.status()));
254    }
255    let bytes = response
256        .body_bytes()
257        .await
258        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
259    let format = image::guess_format(&bytes)?;
260    let image = image::load_from_memory_with_format(&bytes, format)?.into_bgra8();
261    Ok(ImageData::new(image))
262}