user.rs

  1use crate::{
  2    http::{HttpClient, Method, Request, Url},
  3    rpc::{Client, Status},
  4    util::TryFutureExt,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use futures::future;
  8use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
  9use postage::{prelude::Stream, sink::Sink, watch};
 10use std::{
 11    collections::{HashMap, HashSet},
 12    sync::Arc,
 13};
 14use zrpc::{proto, TypedEnvelope};
 15
 16#[derive(Debug)]
 17pub struct User {
 18    pub id: u64,
 19    pub github_login: String,
 20    pub avatar: Option<Arc<ImageData>>,
 21}
 22
 23#[derive(Debug)]
 24pub struct Collaborator {
 25    pub user: Arc<User>,
 26    pub worktrees: Vec<WorktreeMetadata>,
 27}
 28
 29#[derive(Debug)]
 30pub struct WorktreeMetadata {
 31    pub id: u64,
 32    pub root_name: String,
 33    pub is_shared: bool,
 34    pub guests: Vec<Arc<User>>,
 35}
 36
 37pub struct UserStore {
 38    users: HashMap<u64, Arc<User>>,
 39    current_user: watch::Receiver<Option<Arc<User>>>,
 40    collaborators: Arc<[Collaborator]>,
 41    rpc: Arc<Client>,
 42    http: Arc<dyn HttpClient>,
 43    _maintain_collaborators: Task<()>,
 44    _maintain_current_user: Task<()>,
 45}
 46
 47pub enum Event {}
 48
 49impl Entity for UserStore {
 50    type Event = Event;
 51}
 52
 53impl UserStore {
 54    pub fn new(rpc: Arc<Client>, http: Arc<dyn HttpClient>, cx: &mut ModelContext<Self>) -> Self {
 55        let (mut current_user_tx, current_user_rx) = watch::channel();
 56        let (mut update_collaborators_tx, mut update_collaborators_rx) =
 57            watch::channel::<Option<proto::UpdateCollaborators>>();
 58        let update_collaborators_subscription = rpc.subscribe(
 59            cx,
 60            move |_: &mut Self, msg: TypedEnvelope<proto::UpdateCollaborators>, _, _| {
 61                let _ = update_collaborators_tx.blocking_send(Some(msg.payload));
 62                Ok(())
 63            },
 64        );
 65        Self {
 66            users: Default::default(),
 67            current_user: current_user_rx,
 68            collaborators: Arc::from([]),
 69            rpc: rpc.clone(),
 70            http,
 71            _maintain_collaborators: cx.spawn_weak(|this, mut cx| async move {
 72                let _subscription = update_collaborators_subscription;
 73                while let Some(message) = update_collaborators_rx.recv().await {
 74                    if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
 75                        this.update(&mut cx, |this, cx| this.update_collaborators(message, cx))
 76                            .log_err()
 77                            .await;
 78                    }
 79                }
 80            }),
 81            _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
 82                let mut status = rpc.status();
 83                while let Some(status) = status.recv().await {
 84                    match status {
 85                        Status::Connected { .. } => {
 86                            if let Some((this, user_id)) = this.upgrade(&cx).zip(rpc.user_id()) {
 87                                let user = this
 88                                    .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
 89                                    .log_err()
 90                                    .await;
 91                                current_user_tx.send(user).await.ok();
 92                            }
 93                        }
 94                        Status::SignedOut => {
 95                            current_user_tx.send(None).await.ok();
 96                        }
 97                        _ => {}
 98                    }
 99                }
100            }),
101        }
102    }
103
104    fn update_collaborators(
105        &mut self,
106        message: proto::UpdateCollaborators,
107        cx: &mut ModelContext<Self>,
108    ) -> Task<Result<()>> {
109        let mut user_ids = HashSet::new();
110        for collaborator in &message.collaborators {
111            user_ids.insert(collaborator.user_id);
112            user_ids.extend(
113                collaborator
114                    .worktrees
115                    .iter()
116                    .flat_map(|w| &w.guests)
117                    .copied(),
118            );
119        }
120
121        let load_users = self.load_users(user_ids.into_iter().collect(), cx);
122        cx.spawn(|this, mut cx| async move {
123            load_users.await?;
124
125            let mut collaborators = Vec::new();
126            for collaborator in message.collaborators {
127                collaborators.push(Collaborator::from_proto(collaborator, &this, &mut cx).await?);
128            }
129
130            this.update(&mut cx, |this, cx| {
131                collaborators.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
132                this.collaborators = collaborators.into();
133                cx.notify();
134            });
135
136            Ok(())
137        })
138    }
139
140    pub fn collaborators(&self) -> &Arc<[Collaborator]> {
141        &self.collaborators
142    }
143
144    pub fn load_users(
145        &mut self,
146        mut user_ids: Vec<u64>,
147        cx: &mut ModelContext<Self>,
148    ) -> Task<Result<()>> {
149        let rpc = self.rpc.clone();
150        let http = self.http.clone();
151        user_ids.retain(|id| !self.users.contains_key(id));
152        cx.spawn_weak(|this, mut cx| async move {
153            if !user_ids.is_empty() {
154                let response = rpc.request(proto::GetUsers { user_ids }).await?;
155                let new_users = future::join_all(
156                    response
157                        .users
158                        .into_iter()
159                        .map(|user| User::new(user, http.as_ref())),
160                )
161                .await;
162
163                if let Some(this) = this.upgrade(&cx) {
164                    this.update(&mut cx, |this, _| {
165                        for user in new_users {
166                            this.users.insert(user.id, Arc::new(user));
167                        }
168                    });
169                }
170            }
171
172            Ok(())
173        })
174    }
175
176    pub fn fetch_user(
177        &mut self,
178        user_id: u64,
179        cx: &mut ModelContext<Self>,
180    ) -> Task<Result<Arc<User>>> {
181        if let Some(user) = self.users.get(&user_id).cloned() {
182            return cx.spawn_weak(|_, _| async move { Ok(user) });
183        }
184
185        let load_users = self.load_users(vec![user_id], cx);
186        cx.spawn(|this, mut cx| async move {
187            load_users.await?;
188            this.update(&mut cx, |this, _| {
189                this.users
190                    .get(&user_id)
191                    .cloned()
192                    .ok_or_else(|| anyhow!("server responded with no users"))
193            })
194        })
195    }
196
197    pub fn current_user(&self) -> Option<Arc<User>> {
198        self.current_user.borrow().clone()
199    }
200
201    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
202        self.current_user.clone()
203    }
204}
205
206impl User {
207    async fn new(message: proto::User, http: &dyn HttpClient) -> Self {
208        User {
209            id: message.id,
210            github_login: message.github_login,
211            avatar: fetch_avatar(http, &message.avatar_url).log_err().await,
212        }
213    }
214}
215
216impl Collaborator {
217    async fn from_proto(
218        collaborator: proto::Collaborator,
219        user_store: &ModelHandle<UserStore>,
220        cx: &mut AsyncAppContext,
221    ) -> Result<Self> {
222        let user = user_store
223            .update(cx, |user_store, cx| {
224                user_store.fetch_user(collaborator.user_id, cx)
225            })
226            .await?;
227        let mut worktrees = Vec::new();
228        for worktree in collaborator.worktrees {
229            let mut guests = Vec::new();
230            for participant_id in worktree.guests {
231                guests.push(
232                    user_store
233                        .update(cx, |user_store, cx| {
234                            user_store.fetch_user(participant_id, cx)
235                        })
236                        .await?,
237                );
238            }
239            worktrees.push(WorktreeMetadata {
240                id: worktree.id,
241                root_name: worktree.root_name,
242                is_shared: worktree.is_shared,
243                guests,
244            });
245        }
246        Ok(Self { user, worktrees })
247    }
248}
249
250async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
251    let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
252    let mut request = Request::new(Method::Get, url);
253    request.middleware(surf::middleware::Redirect::default());
254
255    let mut response = http
256        .send(request)
257        .await
258        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
259    let bytes = response
260        .body_bytes()
261        .await
262        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
263    let format = image::guess_format(&bytes)?;
264    let image = image::load_from_memory_with_format(&bytes, format)?.into_bgra8();
265    Ok(ImageData::new(image))
266}