user.rs

  1use crate::{
  2    http::{HttpClient, Method, Request, Url},
  3    rpc::{Client, Status},
  4    util::TryFutureExt,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use futures::future;
  8use gpui::{executor, ImageData, Task};
  9use parking_lot::Mutex;
 10use postage::{oneshot, prelude::Stream, sink::Sink, watch};
 11use std::{
 12    collections::HashMap,
 13    sync::{Arc, Weak},
 14};
 15use zrpc::proto;
 16
 17#[derive(Debug)]
 18pub struct User {
 19    pub id: u64,
 20    pub github_login: String,
 21    pub avatar: Option<Arc<ImageData>>,
 22}
 23
 24pub struct UserStore {
 25    users: Mutex<HashMap<u64, Arc<User>>>,
 26    current_user: watch::Receiver<Option<Arc<User>>>,
 27    rpc: Arc<Client>,
 28    http: Arc<dyn HttpClient>,
 29    _maintain_current_user: Task<()>,
 30}
 31
 32impl UserStore {
 33    pub fn new(
 34        rpc: Arc<Client>,
 35        http: Arc<dyn HttpClient>,
 36        executor: &executor::Background,
 37    ) -> Arc<Self> {
 38        let (mut current_user_tx, current_user_rx) = watch::channel();
 39        let (mut this_tx, mut this_rx) = oneshot::channel::<Weak<Self>>();
 40        let this = Arc::new(Self {
 41            users: Default::default(),
 42            current_user: current_user_rx,
 43            rpc: rpc.clone(),
 44            http,
 45            _maintain_current_user: executor.spawn(async move {
 46                let this = if let Some(this) = this_rx.recv().await {
 47                    this
 48                } else {
 49                    return;
 50                };
 51                let mut status = rpc.status();
 52                while let Some(status) = status.recv().await {
 53                    match status {
 54                        Status::Connected { user_id, .. } => {
 55                            if let Some(this) = this.upgrade() {
 56                                current_user_tx
 57                                    .send(this.fetch_user(user_id).log_err().await)
 58                                    .await
 59                                    .ok();
 60                            }
 61                        }
 62                        Status::SignedOut => {
 63                            current_user_tx.send(None).await.ok();
 64                        }
 65                        _ => {}
 66                    }
 67                }
 68            }),
 69        });
 70        let weak = Arc::downgrade(&this);
 71        executor
 72            .spawn(async move { this_tx.send(weak).await })
 73            .detach();
 74        this
 75    }
 76
 77    pub async fn load_users(&self, mut user_ids: Vec<u64>) -> Result<()> {
 78        {
 79            let users = self.users.lock();
 80            user_ids.retain(|id| !users.contains_key(id));
 81        }
 82
 83        if !user_ids.is_empty() {
 84            let response = self.rpc.request(proto::GetUsers { user_ids }).await?;
 85            let new_users = future::join_all(
 86                response
 87                    .users
 88                    .into_iter()
 89                    .map(|user| User::new(user, self.http.as_ref())),
 90            )
 91            .await;
 92            let mut users = self.users.lock();
 93            for user in new_users {
 94                users.insert(user.id, Arc::new(user));
 95            }
 96        }
 97
 98        Ok(())
 99    }
100
101    pub async fn fetch_user(&self, user_id: u64) -> Result<Arc<User>> {
102        if let Some(user) = self.users.lock().get(&user_id).cloned() {
103            return Ok(user);
104        }
105
106        self.load_users(vec![user_id]).await?;
107        self.users
108            .lock()
109            .get(&user_id)
110            .cloned()
111            .ok_or_else(|| anyhow!("server responded with no users"))
112    }
113
114    pub fn current_user(&self) -> &watch::Receiver<Option<Arc<User>>> {
115        &self.current_user
116    }
117}
118
119impl User {
120    async fn new(message: proto::User, http: &dyn HttpClient) -> Self {
121        User {
122            id: message.id,
123            github_login: message.github_login,
124            avatar: fetch_avatar(http, &message.avatar_url).log_err().await,
125        }
126    }
127}
128
129async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
130    let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
131    let request = Request::new(Method::Get, url);
132    let mut response = http
133        .send(request)
134        .await
135        .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
136    let bytes = response
137        .body_bytes()
138        .await
139        .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
140    let format = image::guess_format(&bytes)?;
141    let image = image::load_from_memory_with_format(&bytes, format)?.into_bgra8();
142    Ok(ImageData::new(image))
143}