user.rs

  1use super::{
  2    http::{HttpClient, Method, Request, Url},
  3    proto, Client, Status, TypedEnvelope,
  4};
  5use anyhow::{anyhow, Context, Result};
  6use futures::future;
  7use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  8use postage::{prelude::Stream, sink::Sink, watch};
  9use std::{
 10    collections::{HashMap, HashSet},
 11    sync::Arc,
 12};
 13use util::TryFutureExt as _;
 14
 15#[derive(Debug)]
 16pub struct User {
 17    pub id: u64,
 18    pub github_login: String,
 19    pub avatar: Option<Arc<ImageData>>,
 20}
 21
 22#[derive(Debug)]
 23pub struct Collaborator {
 24    pub user: Arc<User>,
 25    pub worktrees: Vec<WorktreeMetadata>,
 26}
 27
 28#[derive(Debug)]
 29pub struct WorktreeMetadata {
 30    pub id: u64,
 31    pub root_name: String,
 32    pub is_shared: bool,
 33    pub guests: Vec<Arc<User>>,
 34}
 35
 36pub struct UserStore {
 37    users: HashMap<u64, Arc<User>>,
 38    current_user: watch::Receiver<Option<Arc<User>>>,
 39    collaborators: Arc<[Collaborator]>,
 40    rpc: Arc<Client>,
 41    http: Arc<dyn HttpClient>,
 42    _maintain_collaborators: Task<()>,
 43    _maintain_current_user: Task<()>,
 44}
 45
 46pub enum Event {}
 47
 48impl Entity for UserStore {
 49    type Event = Event;
 50}
 51
 52impl UserStore {
 53    pub fn new(rpc: Arc<Client>, http: Arc<dyn HttpClient>, cx: &mut ModelContext<Self>) -> Self {
 54        let (mut current_user_tx, current_user_rx) = watch::channel();
 55        let (mut update_collaborators_tx, mut update_collaborators_rx) =
 56            watch::channel::<Option<proto::UpdateCollaborators>>();
 57        let update_collaborators_subscription = rpc.subscribe(
 58            cx,
 59            move |_: &mut Self, msg: TypedEnvelope<proto::UpdateCollaborators>, _, _| {
 60                let _ = update_collaborators_tx.blocking_send(Some(msg.payload));
 61                Ok(())
 62            },
 63        );
 64        Self {
 65            users: Default::default(),
 66            current_user: current_user_rx,
 67            collaborators: Arc::from([]),
 68            rpc: rpc.clone(),
 69            http,
 70            _maintain_collaborators: cx.spawn_weak(|this, mut cx| async move {
 71                let _subscription = update_collaborators_subscription;
 72                while let Some(message) = update_collaborators_rx.recv().await {
 73                    if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
 74                        this.update(&mut cx, |this, cx| this.update_collaborators(message, cx))
 75                            .log_err()
 76                            .await;
 77                    }
 78                }
 79            }),
 80            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
 81                let mut status = rpc.status();
 82                while let Some(status) = status.recv().await {
 83                    match status {
 84                        Status::Connected { .. } => {
 85                            if let Some((this, user_id)) = this.upgrade(&cx).zip(rpc.user_id()) {
 86                                let user = this
 87                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
 88                                    .log_err()
 89                                    .await;
 90                                current_user_tx.send(user).await.ok();
 91                            }
 92                        }
 93                        Status::SignedOut => {
 94                            current_user_tx.send(None).await.ok();
 95                        }
 96                        _ => {}
 97                    }
 98                }
 99            }),
100        }
101    }
102
103    fn update_collaborators(
104        &mut self,
105        message: proto::UpdateCollaborators,
106        cx: &mut ModelContext<Self>,
107    ) -> Task<Result<()>> {
108        let mut user_ids = HashSet::new();
109        for collaborator in &message.collaborators {
110            user_ids.insert(collaborator.user_id);
111            user_ids.extend(
112                collaborator
113                    .worktrees
114                    .iter()
115                    .flat_map(|w| &w.guests)
116                    .copied(),
117            );
118        }
119
120        let load_users = self.load_users(user_ids.into_iter().collect(), cx);
121        cx.spawn(|this, mut cx| async move {
122            load_users.await?;
123
124            let mut collaborators = Vec::new();
125            for collaborator in message.collaborators {
126                collaborators.push(Collaborator::from_proto(collaborator, &this, &mut cx).await?);
127            }
128
129            this.update(&mut cx, |this, cx| {
130                collaborators.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
131                this.collaborators = collaborators.into();
132                cx.notify();
133            });
134
135            Ok(())
136        })
137    }
138
139    pub fn collaborators(&self) -> &Arc<[Collaborator]> {
140        &self.collaborators
141    }
142
143    pub fn load_users(
144        &mut self,
145        mut user_ids: Vec<u64>,
146        cx: &mut ModelContext<Self>,
147    ) -> Task<Result<()>> {
148        let rpc = self.rpc.clone();
149        let http = self.http.clone();
150        user_ids.retain(|id| !self.users.contains_key(id));
151        cx.spawn_weak(|this, mut cx| async move {
152            if !user_ids.is_empty() {
153                let response = rpc.request(proto::GetUsers { user_ids }).await?;
154                let new_users = future::join_all(
155                    response
156                        .users
157                        .into_iter()
158                        .map(|user| User::new(user, http.as_ref())),
159                )
160                .await;
161
162                if let Some(this) = this.upgrade(&cx) {
163                    this.update(&mut cx, |this, _| {
164                        for user in new_users {
165                            this.users.insert(user.id, Arc::new(user));
166                        }
167                    });
168                }
169            }
170
171            Ok(())
172        })
173    }
174
175    pub fn fetch_user(
176        &mut self,
177        user_id: u64,
178        cx: &mut ModelContext<Self>,
179    ) -> Task<Result<Arc<User>>> {
180        if let Some(user) = self.users.get(&user_id).cloned() {
181            return cx.spawn_weak(|_, _| async move { Ok(user) });
182        }
183
184        let load_users = self.load_users(vec![user_id], cx);
185        cx.spawn(|this, mut cx| async move {
186            load_users.await?;
187            this.update(&mut cx, |this, _| {
188                this.users
189                    .get(&user_id)
190                    .cloned()
191                    .ok_or_else(|| anyhow!("server responded with no users"))
192            })
193        })
194    }
195
196    pub fn current_user(&self) -> Option<Arc<User>> {
197        self.current_user.borrow().clone()
198    }
199
200    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
201        self.current_user.clone()
202    }
203}
204
205impl User {
206    async fn new(message: proto::User, http: &dyn HttpClient) -> Self {
207        User {
208            id: message.id,
209            github_login: message.github_login,
210            avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
211        }
212    }
213}
214
215impl Collaborator {
216    async fn from_proto(
217        collaborator: proto::Collaborator,
218        user_store: &ModelHandle<UserStore>,
219        cx: &mut AsyncAppContext,
220    ) -> Result<Self> {
221        let user = user_store
222            .update(cx, |user_store, cx| {
223                user_store.fetch_user(collaborator.user_id, cx)
224            })
225            .await?;
226        let mut worktrees = Vec::new();
227        for worktree in collaborator.worktrees {
228            let mut guests = Vec::new();
229            for participant_id in worktree.guests {
230                guests.push(
231                    user_store
232                        .update(cx, |user_store, cx| {
233                            user_store.fetch_user(participant_id, cx)
234                        })
235                        .await?,
236                );
237            }
238            worktrees.push(WorktreeMetadata {
239                id: worktree.id,
240                root_name: worktree.root_name,
241                is_shared: worktree.is_shared,
242                guests,
243            });
244        }
245        Ok(Self { user, worktrees })
246    }
247}
248
249async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
250    let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
251    let mut request = Request::new(Method::Get, url);
252    request.middleware(surf::middleware::Redirect::default());
253
254    let mut response = http
255        .send(request)
256        .await
257        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
258    if !response.status().is_success() {
259        return Err(anyhow!("avatar request failed {:?}", response.status()));
260    }
261    let bytes = response
262        .body_bytes()
263        .await
264        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
265    let format = image::guess_format(&bytes)?;
266    let image = image::load_from_memory_with_format(&bytes, format)?.into_bgra8();
267    Ok(ImageData::new(image))
268}