1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use futures::{future, AsyncReadExt, Future};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20#[derive(Debug)]
21pub struct Contact {
22 pub user: Arc<User>,
23 pub projects: Vec<ProjectMetadata>,
24}
25
26#[derive(Debug)]
27pub struct ProjectMetadata {
28 pub id: u64,
29 pub is_shared: bool,
30 pub worktree_root_names: Vec<String>,
31 pub guests: Vec<Arc<User>>,
32}
33
34pub struct UserStore {
35 users: HashMap<u64, Arc<User>>,
36 update_contacts_tx: watch::Sender<Option<proto::UpdateContacts>>,
37 current_user: watch::Receiver<Option<Arc<User>>>,
38 contacts: Vec<Arc<Contact>>,
39 client: Weak<Client>,
40 http: Arc<dyn HttpClient>,
41 _maintain_contacts: Task<()>,
42 _maintain_current_user: Task<()>,
43}
44
45pub enum Event {}
46
47impl Entity for UserStore {
48 type Event = Event;
49}
50
51impl UserStore {
52 pub fn new(
53 client: Arc<Client>,
54 http: Arc<dyn HttpClient>,
55 cx: &mut ModelContext<Self>,
56 ) -> Self {
57 let (mut current_user_tx, current_user_rx) = watch::channel();
58 let (update_contacts_tx, mut update_contacts_rx) =
59 watch::channel::<Option<proto::UpdateContacts>>();
60 let rpc_subscription =
61 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
62 Self {
63 users: Default::default(),
64 current_user: current_user_rx,
65 contacts: Default::default(),
66 client: Arc::downgrade(&client),
67 update_contacts_tx,
68 http,
69 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
70 let _subscription = rpc_subscription;
71 while let Some(message) = update_contacts_rx.recv().await {
72 if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
73 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
74 .log_err()
75 .await;
76 }
77 }
78 }),
79 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
80 let mut status = client.status();
81 while let Some(status) = status.recv().await {
82 match status {
83 Status::Connected { .. } => {
84 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
85 let user = this
86 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
87 .log_err()
88 .await;
89 current_user_tx.send(user).await.ok();
90 }
91 }
92 Status::SignedOut => {
93 current_user_tx.send(None).await.ok();
94 }
95 _ => {}
96 }
97 }
98 }),
99 }
100 }
101
102 async fn handle_update_contacts(
103 this: ModelHandle<Self>,
104 msg: TypedEnvelope<proto::UpdateContacts>,
105 _: Arc<Client>,
106 mut cx: AsyncAppContext,
107 ) -> Result<()> {
108 this.update(&mut cx, |this, _| {
109 *this.update_contacts_tx.borrow_mut() = Some(msg.payload);
110 });
111 Ok(())
112 }
113
114 fn update_contacts(
115 &mut self,
116 message: proto::UpdateContacts,
117 cx: &mut ModelContext<Self>,
118 ) -> Task<Result<()>> {
119 let mut user_ids = HashSet::new();
120 for contact in &message.contacts {
121 user_ids.insert(contact.user_id);
122 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
123 }
124 user_ids.extend(message.pending_requests_to_user_ids.iter());
125 user_ids.extend(
126 message
127 .pending_requests_from_user_ids
128 .iter()
129 .map(|req| req.user_id),
130 );
131
132 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
133 cx.spawn(|this, mut cx| async move {
134 load_users.await?;
135
136 let mut contacts = Vec::new();
137 for contact in message.contacts {
138 contacts.push(Arc::new(
139 Contact::from_proto(contact, &this, &mut cx).await?,
140 ));
141 }
142
143 this.update(&mut cx, |this, cx| {
144 contacts.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
145 this.contacts = contacts;
146 cx.notify();
147 });
148
149 Ok(())
150 })
151 }
152
153 pub fn contacts(&self) -> &[Arc<Contact>] {
154 &self.contacts
155 }
156
157 pub fn has_contact(&self, user: &Arc<User>) -> bool {
158 self.contacts
159 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
160 .is_ok()
161 }
162
163 pub fn request_contact(&self, to_user_id: u64) -> impl Future<Output = Result<()>> {
164 let client = self.client.upgrade();
165 async move {
166 client
167 .ok_or_else(|| anyhow!("not logged in"))?
168 .request(proto::RequestContact { to_user_id })
169 .await?;
170 Ok(())
171 }
172 }
173
174 pub fn respond_to_contact_request(
175 &self,
176 from_user_id: u64,
177 accept: bool,
178 ) -> impl Future<Output = Result<()>> {
179 let client = self.client.upgrade();
180 async move {
181 client
182 .ok_or_else(|| anyhow!("not logged in"))?
183 .request(proto::RespondToContactRequest {
184 requesting_user_id: from_user_id,
185 response: if accept {
186 proto::ContactRequestResponse::Accept
187 } else {
188 proto::ContactRequestResponse::Reject
189 } as i32,
190 })
191 .await?;
192 Ok(())
193 }
194 }
195
196 pub fn get_users(
197 &mut self,
198 mut user_ids: Vec<u64>,
199 cx: &mut ModelContext<Self>,
200 ) -> Task<Result<Vec<Arc<User>>>> {
201 user_ids.retain(|id| !self.users.contains_key(id));
202 self.load_users(proto::GetUsers { user_ids }, cx)
203 }
204
205 pub fn fuzzy_search_users(
206 &mut self,
207 query: String,
208 cx: &mut ModelContext<Self>,
209 ) -> Task<Result<Vec<Arc<User>>>> {
210 self.load_users(proto::FuzzySearchUsers { query }, cx)
211 }
212
213 pub fn fetch_user(
214 &mut self,
215 user_id: u64,
216 cx: &mut ModelContext<Self>,
217 ) -> Task<Result<Arc<User>>> {
218 if let Some(user) = self.users.get(&user_id).cloned() {
219 return cx.foreground().spawn(async move { Ok(user) });
220 }
221
222 let load_users = self.get_users(vec![user_id], cx);
223 cx.spawn(|this, mut cx| async move {
224 load_users.await?;
225 this.update(&mut cx, |this, _| {
226 this.users
227 .get(&user_id)
228 .cloned()
229 .ok_or_else(|| anyhow!("server responded with no users"))
230 })
231 })
232 }
233
234 pub fn current_user(&self) -> Option<Arc<User>> {
235 self.current_user.borrow().clone()
236 }
237
238 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
239 self.current_user.clone()
240 }
241
242 fn load_users(
243 &mut self,
244 request: impl RequestMessage<Response = UsersResponse>,
245 cx: &mut ModelContext<Self>,
246 ) -> Task<Result<Vec<Arc<User>>>> {
247 let client = self.client.clone();
248 let http = self.http.clone();
249 cx.spawn_weak(|this, mut cx| async move {
250 if let Some(rpc) = client.upgrade() {
251 let response = rpc.request(request).await?;
252 let users = future::join_all(
253 response
254 .users
255 .into_iter()
256 .map(|user| User::new(user, http.as_ref())),
257 )
258 .await;
259
260 if let Some(this) = this.upgrade(&cx) {
261 this.update(&mut cx, |this, _| {
262 for user in &users {
263 this.users.insert(user.id, user.clone());
264 }
265 });
266 }
267 Ok(users)
268 } else {
269 Ok(Vec::new())
270 }
271 })
272 }
273}
274
275impl User {
276 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
277 Arc::new(User {
278 id: message.id,
279 github_login: message.github_login,
280 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
281 })
282 }
283}
284
285impl Contact {
286 async fn from_proto(
287 contact: proto::Contact,
288 user_store: &ModelHandle<UserStore>,
289 cx: &mut AsyncAppContext,
290 ) -> Result<Self> {
291 let user = user_store
292 .update(cx, |user_store, cx| {
293 user_store.fetch_user(contact.user_id, cx)
294 })
295 .await?;
296 let mut projects = Vec::new();
297 for project in contact.projects {
298 let mut guests = Vec::new();
299 for participant_id in project.guests {
300 guests.push(
301 user_store
302 .update(cx, |user_store, cx| {
303 user_store.fetch_user(participant_id, cx)
304 })
305 .await?,
306 );
307 }
308 projects.push(ProjectMetadata {
309 id: project.id,
310 worktree_root_names: project.worktree_root_names.clone(),
311 is_shared: project.is_shared,
312 guests,
313 });
314 }
315 Ok(Self { user, projects })
316 }
317}
318
319async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
320 let mut response = http
321 .get(url, Default::default(), true)
322 .await
323 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
324
325 if !response.status().is_success() {
326 return Err(anyhow!("avatar request failed {:?}", response.status()));
327 }
328
329 let mut body = Vec::new();
330 response
331 .body_mut()
332 .read_to_end(&mut body)
333 .await
334 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
335 let format = image::guess_format(&body)?;
336 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
337 Ok(ImageData::new(image))
338}