1use crate::{
2 http::{HttpClient, Method, Request, Url},
3 rpc::{Client, Status},
4 util::TryFutureExt,
5};
6use anyhow::{anyhow, Context, Result};
7use futures::future;
8use gpui::{executor, ImageData, Task};
9use parking_lot::Mutex;
10use postage::{oneshot, prelude::Stream, sink::Sink, watch};
11use std::{
12 collections::HashMap,
13 sync::{Arc, Weak},
14};
15use zrpc::proto;
16
17#[derive(Debug)]
18pub struct User {
19 pub id: u64,
20 pub github_login: String,
21 pub avatar: Option<Arc<ImageData>>,
22}
23
24pub struct UserStore {
25 users: Mutex<HashMap<u64, Arc<User>>>,
26 current_user: watch::Receiver<Option<Arc<User>>>,
27 rpc: Arc<Client>,
28 http: Arc<dyn HttpClient>,
29 _maintain_current_user: Task<()>,
30}
31
32impl UserStore {
33 pub fn new(
34 rpc: Arc<Client>,
35 http: Arc<dyn HttpClient>,
36 executor: &executor::Background,
37 ) -> Arc<Self> {
38 let (mut current_user_tx, current_user_rx) = watch::channel();
39 let (mut this_tx, mut this_rx) = oneshot::channel::<Weak<Self>>();
40 let this = Arc::new(Self {
41 users: Default::default(),
42 current_user: current_user_rx,
43 rpc: rpc.clone(),
44 http,
45 _maintain_current_user: executor.spawn(async move {
46 let this = if let Some(this) = this_rx.recv().await {
47 this
48 } else {
49 return;
50 };
51 let mut status = rpc.status();
52 while let Some(status) = status.recv().await {
53 match status {
54 Status::Connected { .. } => {
55 if let Some((this, user_id)) = this.upgrade().zip(rpc.user_id()) {
56 current_user_tx
57 .send(this.fetch_user(user_id).log_err().await)
58 .await
59 .ok();
60 }
61 }
62 Status::SignedOut => {
63 current_user_tx.send(None).await.ok();
64 }
65 _ => {}
66 }
67 }
68 }),
69 });
70 let weak = Arc::downgrade(&this);
71 executor
72 .spawn(async move { this_tx.send(weak).await })
73 .detach();
74 this
75 }
76
77 pub async fn load_users(&self, mut user_ids: Vec<u64>) -> Result<()> {
78 {
79 let users = self.users.lock();
80 user_ids.retain(|id| !users.contains_key(id));
81 }
82
83 if !user_ids.is_empty() {
84 let response = self.rpc.request(proto::GetUsers { user_ids }).await?;
85 let new_users = future::join_all(
86 response
87 .users
88 .into_iter()
89 .map(|user| User::new(user, self.http.as_ref())),
90 )
91 .await;
92 let mut users = self.users.lock();
93 for user in new_users {
94 users.insert(user.id, Arc::new(user));
95 }
96 }
97
98 Ok(())
99 }
100
101 pub async fn fetch_user(&self, user_id: u64) -> Result<Arc<User>> {
102 if let Some(user) = self.users.lock().get(&user_id).cloned() {
103 return Ok(user);
104 }
105
106 self.load_users(vec![user_id]).await?;
107 self.users
108 .lock()
109 .get(&user_id)
110 .cloned()
111 .ok_or_else(|| anyhow!("server responded with no users"))
112 }
113
114 pub fn current_user(&self) -> &watch::Receiver<Option<Arc<User>>> {
115 &self.current_user
116 }
117}
118
119impl User {
120 async fn new(message: proto::User, http: &dyn HttpClient) -> Self {
121 User {
122 id: message.id,
123 github_login: message.github_login,
124 avatar: fetch_avatar(http, &message.avatar_url).log_err().await,
125 }
126 }
127}
128
129async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
130 let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
131 let mut request = Request::new(Method::Get, url);
132 request.middleware(surf::middleware::Redirect::default());
133
134 let mut response = http
135 .send(request)
136 .await
137 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
138 let bytes = response
139 .body_bytes()
140 .await
141 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
142 let format = image::guess_format(&bytes)?;
143 let image = image::load_from_memory_with_format(&bytes, format)?.into_bgra8();
144 Ok(ImageData::new(image))
145}