1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use futures::{future, AsyncReadExt};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20#[derive(Debug)]
21pub struct Contact {
22 pub user: Arc<User>,
23 pub projects: Vec<ProjectMetadata>,
24}
25
26#[derive(Debug)]
27pub struct ProjectMetadata {
28 pub id: u64,
29 pub is_shared: bool,
30 pub worktree_root_names: Vec<String>,
31 pub guests: Vec<Arc<User>>,
32}
33
34pub struct UserStore {
35 users: HashMap<u64, Arc<User>>,
36 update_contacts_tx: watch::Sender<Option<proto::UpdateContacts>>,
37 current_user: watch::Receiver<Option<Arc<User>>>,
38 contacts: Vec<Arc<Contact>>,
39 client: Weak<Client>,
40 http: Arc<dyn HttpClient>,
41 _maintain_contacts: Task<()>,
42 _maintain_current_user: Task<()>,
43}
44
45pub enum Event {}
46
47impl Entity for UserStore {
48 type Event = Event;
49}
50
51impl UserStore {
52 pub fn new(
53 client: Arc<Client>,
54 http: Arc<dyn HttpClient>,
55 cx: &mut ModelContext<Self>,
56 ) -> Self {
57 let (mut current_user_tx, current_user_rx) = watch::channel();
58 let (update_contacts_tx, mut update_contacts_rx) =
59 watch::channel::<Option<proto::UpdateContacts>>();
60 let rpc_subscription =
61 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
62 Self {
63 users: Default::default(),
64 current_user: current_user_rx,
65 contacts: Default::default(),
66 client: Arc::downgrade(&client),
67 update_contacts_tx,
68 http,
69 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
70 let _subscription = rpc_subscription;
71 while let Some(message) = update_contacts_rx.recv().await {
72 if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
73 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
74 .log_err()
75 .await;
76 }
77 }
78 }),
79 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
80 let mut status = client.status();
81 while let Some(status) = status.recv().await {
82 match status {
83 Status::Connected { .. } => {
84 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
85 let user = this
86 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
87 .log_err()
88 .await;
89 current_user_tx.send(user).await.ok();
90 }
91 }
92 Status::SignedOut => {
93 current_user_tx.send(None).await.ok();
94 }
95 _ => {}
96 }
97 }
98 }),
99 }
100 }
101
102 async fn handle_update_contacts(
103 this: ModelHandle<Self>,
104 msg: TypedEnvelope<proto::UpdateContacts>,
105 _: Arc<Client>,
106 mut cx: AsyncAppContext,
107 ) -> Result<()> {
108 this.update(&mut cx, |this, _| {
109 *this.update_contacts_tx.borrow_mut() = Some(msg.payload);
110 });
111 Ok(())
112 }
113
114 fn update_contacts(
115 &mut self,
116 message: proto::UpdateContacts,
117 cx: &mut ModelContext<Self>,
118 ) -> Task<Result<()>> {
119 let mut user_ids = HashSet::new();
120 for contact in &message.contacts {
121 user_ids.insert(contact.user_id);
122 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
123 }
124
125 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
126 cx.spawn(|this, mut cx| async move {
127 load_users.await?;
128
129 let mut contacts = Vec::new();
130 for contact in message.contacts {
131 contacts.push(Arc::new(
132 Contact::from_proto(contact, &this, &mut cx).await?,
133 ));
134 }
135
136 this.update(&mut cx, |this, cx| {
137 contacts.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
138 this.contacts = contacts;
139 cx.notify();
140 });
141
142 Ok(())
143 })
144 }
145
146 pub fn contacts(&self) -> &[Arc<Contact>] {
147 &self.contacts
148 }
149
150 pub fn has_contact(&self, user: &Arc<User>) -> bool {
151 self.contacts
152 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
153 .is_ok()
154 }
155
156 pub fn get_users(
157 &mut self,
158 mut user_ids: Vec<u64>,
159 cx: &mut ModelContext<Self>,
160 ) -> Task<Result<Vec<Arc<User>>>> {
161 user_ids.retain(|id| !self.users.contains_key(id));
162 self.load_users(proto::GetUsers { user_ids }, cx)
163 }
164
165 pub fn fuzzy_search_users(
166 &mut self,
167 query: String,
168 cx: &mut ModelContext<Self>,
169 ) -> Task<Result<Vec<Arc<User>>>> {
170 self.load_users(proto::FuzzySearchUsers { query }, cx)
171 }
172
173 pub fn fetch_user(
174 &mut self,
175 user_id: u64,
176 cx: &mut ModelContext<Self>,
177 ) -> Task<Result<Arc<User>>> {
178 if let Some(user) = self.users.get(&user_id).cloned() {
179 return cx.foreground().spawn(async move { Ok(user) });
180 }
181
182 let load_users = self.get_users(vec![user_id], cx);
183 cx.spawn(|this, mut cx| async move {
184 load_users.await?;
185 this.update(&mut cx, |this, _| {
186 this.users
187 .get(&user_id)
188 .cloned()
189 .ok_or_else(|| anyhow!("server responded with no users"))
190 })
191 })
192 }
193
194 pub fn current_user(&self) -> Option<Arc<User>> {
195 self.current_user.borrow().clone()
196 }
197
198 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
199 self.current_user.clone()
200 }
201
202 fn load_users(
203 &mut self,
204 request: impl RequestMessage<Response = UsersResponse>,
205 cx: &mut ModelContext<Self>,
206 ) -> Task<Result<Vec<Arc<User>>>> {
207 let client = self.client.clone();
208 let http = self.http.clone();
209 cx.spawn_weak(|this, mut cx| async move {
210 if let Some(rpc) = client.upgrade() {
211 let response = rpc.request(request).await?;
212 let users = future::join_all(
213 response
214 .users
215 .into_iter()
216 .map(|user| User::new(user, http.as_ref())),
217 )
218 .await;
219
220 if let Some(this) = this.upgrade(&cx) {
221 this.update(&mut cx, |this, _| {
222 for user in &users {
223 this.users.insert(user.id, user.clone());
224 }
225 });
226 }
227 Ok(users)
228 } else {
229 Ok(Vec::new())
230 }
231 })
232 }
233}
234
235impl User {
236 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
237 Arc::new(User {
238 id: message.id,
239 github_login: message.github_login,
240 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
241 })
242 }
243}
244
245impl Contact {
246 async fn from_proto(
247 contact: proto::Contact,
248 user_store: &ModelHandle<UserStore>,
249 cx: &mut AsyncAppContext,
250 ) -> Result<Self> {
251 let user = user_store
252 .update(cx, |user_store, cx| {
253 user_store.fetch_user(contact.user_id, cx)
254 })
255 .await?;
256 let mut projects = Vec::new();
257 for project in contact.projects {
258 let mut guests = Vec::new();
259 for participant_id in project.guests {
260 guests.push(
261 user_store
262 .update(cx, |user_store, cx| {
263 user_store.fetch_user(participant_id, cx)
264 })
265 .await?,
266 );
267 }
268 projects.push(ProjectMetadata {
269 id: project.id,
270 worktree_root_names: project.worktree_root_names.clone(),
271 is_shared: project.is_shared,
272 guests,
273 });
274 }
275 Ok(Self { user, projects })
276 }
277}
278
279async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
280 let mut response = http
281 .get(url, Default::default(), true)
282 .await
283 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
284
285 if !response.status().is_success() {
286 return Err(anyhow!("avatar request failed {:?}", response.status()));
287 }
288
289 let mut body = Vec::new();
290 response
291 .body_mut()
292 .read_to_end(&mut body)
293 .await
294 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
295 let format = image::guess_format(&body)?;
296 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
297 Ok(ImageData::new(image))
298}