1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{hash_map::Entry, HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20#[derive(Debug)]
21pub struct Contact {
22 pub user: Arc<User>,
23 pub online: bool,
24 pub projects: Vec<ProjectMetadata>,
25}
26
27#[derive(Debug)]
28pub struct ProjectMetadata {
29 pub id: u64,
30 pub is_shared: bool,
31 pub worktree_root_names: Vec<String>,
32 pub guests: Vec<Arc<User>>,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ContactRequestStatus {
37 None,
38 RequestSent,
39 RequestReceived,
40 RequestAccepted,
41}
42
43pub struct UserStore {
44 users: HashMap<u64, Arc<User>>,
45 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
46 current_user: watch::Receiver<Option<Arc<User>>>,
47 contacts: Vec<Arc<Contact>>,
48 incoming_contact_requests: Vec<Arc<User>>,
49 outgoing_contact_requests: Vec<Arc<User>>,
50 pending_contact_requests: HashMap<u64, usize>,
51 client: Weak<Client>,
52 http: Arc<dyn HttpClient>,
53 _maintain_contacts: Task<()>,
54 _maintain_current_user: Task<()>,
55}
56
57pub enum Event {
58 NotifyIncomingRequest(Arc<User>),
59}
60
61impl Entity for UserStore {
62 type Event = Event;
63}
64
65enum UpdateContacts {
66 Update(proto::UpdateContacts),
67 Clear(postage::barrier::Sender),
68}
69
70impl UserStore {
71 pub fn new(
72 client: Arc<Client>,
73 http: Arc<dyn HttpClient>,
74 cx: &mut ModelContext<Self>,
75 ) -> Self {
76 let (mut current_user_tx, current_user_rx) = watch::channel();
77 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
78 let rpc_subscription =
79 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
80 Self {
81 users: Default::default(),
82 current_user: current_user_rx,
83 contacts: Default::default(),
84 incoming_contact_requests: Default::default(),
85 outgoing_contact_requests: Default::default(),
86 client: Arc::downgrade(&client),
87 update_contacts_tx,
88 http,
89 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
90 let _subscription = rpc_subscription;
91 while let Some(message) = update_contacts_rx.next().await {
92 if let Some(this) = this.upgrade(&cx) {
93 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
94 .log_err()
95 .await;
96 }
97 }
98 }),
99 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
100 let mut status = client.status();
101 while let Some(status) = status.recv().await {
102 match status {
103 Status::Connected { .. } => {
104 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
105 let user = this
106 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
107 .log_err()
108 .await;
109 current_user_tx.send(user).await.ok();
110 }
111 }
112 Status::SignedOut => {
113 current_user_tx.send(None).await.ok();
114 if let Some(this) = this.upgrade(&cx) {
115 this.update(&mut cx, |this, _| this.clear_contacts()).await;
116 }
117 }
118 Status::ConnectionLost => {
119 if let Some(this) = this.upgrade(&cx) {
120 this.update(&mut cx, |this, _| this.clear_contacts()).await;
121 }
122 }
123 _ => {}
124 }
125 }
126 }),
127 pending_contact_requests: Default::default(),
128 }
129 }
130
131 async fn handle_update_contacts(
132 this: ModelHandle<Self>,
133 msg: TypedEnvelope<proto::UpdateContacts>,
134 _: Arc<Client>,
135 mut cx: AsyncAppContext,
136 ) -> Result<()> {
137 this.update(&mut cx, |this, _| {
138 this.update_contacts_tx
139 .unbounded_send(UpdateContacts::Update(msg.payload))
140 .unwrap();
141 });
142 Ok(())
143 }
144
145 fn update_contacts(
146 &mut self,
147 message: UpdateContacts,
148 cx: &mut ModelContext<Self>,
149 ) -> Task<Result<()>> {
150 match message {
151 UpdateContacts::Clear(barrier) => {
152 self.contacts.clear();
153 self.incoming_contact_requests.clear();
154 self.outgoing_contact_requests.clear();
155 drop(barrier);
156 Task::ready(Ok(()))
157 }
158 UpdateContacts::Update(message) => {
159 log::info!(
160 "update contacts on client {}: {:?}",
161 self.client.upgrade().unwrap().id,
162 message
163 );
164 let mut user_ids = HashSet::new();
165 for contact in &message.contacts {
166 user_ids.insert(contact.user_id);
167 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
168 }
169 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
170 user_ids.extend(message.outgoing_requests.iter());
171
172 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
173 cx.spawn(|this, mut cx| async move {
174 load_users.await?;
175
176 // Users are fetched in parallel above and cached in call to get_users
177 // No need to paralellize here
178 let mut updated_contacts = Vec::new();
179 for contact in message.contacts {
180 updated_contacts.push(Arc::new(
181 Contact::from_proto(contact, &this, &mut cx).await?,
182 ));
183 }
184
185 let mut incoming_requests = Vec::new();
186 for request in message.incoming_requests {
187 incoming_requests.push({
188 let user = this
189 .update(&mut cx, |this, cx| {
190 this.fetch_user(request.requester_id, cx)
191 })
192 .await?;
193 (user, request.should_notify)
194 });
195 }
196
197 let mut outgoing_requests = Vec::new();
198 for requested_user_id in message.outgoing_requests {
199 outgoing_requests.push(
200 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
201 .await?,
202 );
203 }
204
205 let removed_contacts =
206 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
207 let removed_incoming_requests =
208 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
209 let removed_outgoing_requests =
210 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
211
212 this.update(&mut cx, |this, cx| {
213 // Remove contacts
214 this.contacts
215 .retain(|contact| !removed_contacts.contains(&contact.user.id));
216 // Update existing contacts and insert new ones
217 for updated_contact in updated_contacts {
218 match this.contacts.binary_search_by_key(
219 &&updated_contact.user.github_login,
220 |contact| &contact.user.github_login,
221 ) {
222 Ok(ix) => this.contacts[ix] = updated_contact,
223 Err(ix) => this.contacts.insert(ix, updated_contact),
224 }
225 }
226
227 // Remove incoming contact requests
228 this.incoming_contact_requests
229 .retain(|user| !removed_incoming_requests.contains(&user.id));
230 // Update existing incoming requests and insert new ones
231 for (user, should_notify) in incoming_requests {
232 if should_notify {
233 cx.emit(Event::NotifyIncomingRequest(user.clone()));
234 }
235
236 match this
237 .incoming_contact_requests
238 .binary_search_by_key(&&user.github_login, |contact| {
239 &contact.github_login
240 }) {
241 Ok(ix) => this.incoming_contact_requests[ix] = user,
242 Err(ix) => this.incoming_contact_requests.insert(ix, user),
243 }
244 }
245
246 // Remove outgoing contact requests
247 this.outgoing_contact_requests
248 .retain(|user| !removed_outgoing_requests.contains(&user.id));
249 // Update existing incoming requests and insert new ones
250 for request in outgoing_requests {
251 match this
252 .outgoing_contact_requests
253 .binary_search_by_key(&&request.github_login, |contact| {
254 &contact.github_login
255 }) {
256 Ok(ix) => this.outgoing_contact_requests[ix] = request,
257 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
258 }
259 }
260
261 cx.notify();
262 });
263
264 Ok(())
265 })
266 }
267 }
268 }
269
270 pub fn contacts(&self) -> &[Arc<Contact>] {
271 &self.contacts
272 }
273
274 pub fn has_contact(&self, user: &Arc<User>) -> bool {
275 self.contacts
276 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
277 .is_ok()
278 }
279
280 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
281 &self.incoming_contact_requests
282 }
283
284 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
285 &self.outgoing_contact_requests
286 }
287
288 pub fn is_contact_request_pending(&self, user: &User) -> bool {
289 self.pending_contact_requests.contains_key(&user.id)
290 }
291
292 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
293 if self
294 .contacts
295 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
296 .is_ok()
297 {
298 ContactRequestStatus::RequestAccepted
299 } else if self
300 .outgoing_contact_requests
301 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
302 .is_ok()
303 {
304 ContactRequestStatus::RequestSent
305 } else if self
306 .incoming_contact_requests
307 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
308 .is_ok()
309 {
310 ContactRequestStatus::RequestReceived
311 } else {
312 ContactRequestStatus::None
313 }
314 }
315
316 pub fn request_contact(
317 &mut self,
318 responder_id: u64,
319 cx: &mut ModelContext<Self>,
320 ) -> Task<Result<()>> {
321 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
322 }
323
324 pub fn remove_contact(
325 &mut self,
326 user_id: u64,
327 cx: &mut ModelContext<Self>,
328 ) -> Task<Result<()>> {
329 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
330 }
331
332 pub fn respond_to_contact_request(
333 &mut self,
334 requester_id: u64,
335 accept: bool,
336 cx: &mut ModelContext<Self>,
337 ) -> Task<Result<()>> {
338 self.perform_contact_request(
339 requester_id,
340 proto::RespondToContactRequest {
341 requester_id,
342 response: if accept {
343 proto::ContactRequestResponse::Accept
344 } else {
345 proto::ContactRequestResponse::Reject
346 } as i32,
347 },
348 cx,
349 )
350 }
351
352 fn perform_contact_request<T: RequestMessage>(
353 &mut self,
354 user_id: u64,
355 request: T,
356 cx: &mut ModelContext<Self>,
357 ) -> Task<Result<()>> {
358 let client = self.client.upgrade();
359 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
360 cx.notify();
361
362 cx.spawn(|this, mut cx| async move {
363 let response = client
364 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
365 .request(request)
366 .await;
367 this.update(&mut cx, |this, cx| {
368 if let Entry::Occupied(mut request_count) =
369 this.pending_contact_requests.entry(user_id)
370 {
371 *request_count.get_mut() -= 1;
372 if *request_count.get() == 0 {
373 request_count.remove();
374 }
375 }
376 cx.notify();
377 });
378 response?;
379 Ok(())
380 })
381 }
382
383 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
384 let (tx, mut rx) = postage::barrier::channel();
385 self.update_contacts_tx
386 .unbounded_send(UpdateContacts::Clear(tx))
387 .unwrap();
388 async move {
389 rx.recv().await;
390 }
391 }
392
393 pub fn get_users(
394 &mut self,
395 mut user_ids: Vec<u64>,
396 cx: &mut ModelContext<Self>,
397 ) -> Task<Result<()>> {
398 user_ids.retain(|id| !self.users.contains_key(id));
399 if user_ids.is_empty() {
400 Task::ready(Ok(()))
401 } else {
402 let load = self.load_users(proto::GetUsers { user_ids }, cx);
403 cx.foreground().spawn(async move {
404 load.await?;
405 Ok(())
406 })
407 }
408 }
409
410 pub fn fuzzy_search_users(
411 &mut self,
412 query: String,
413 cx: &mut ModelContext<Self>,
414 ) -> Task<Result<Vec<Arc<User>>>> {
415 self.load_users(proto::FuzzySearchUsers { query }, cx)
416 }
417
418 pub fn fetch_user(
419 &mut self,
420 user_id: u64,
421 cx: &mut ModelContext<Self>,
422 ) -> Task<Result<Arc<User>>> {
423 if let Some(user) = self.users.get(&user_id).cloned() {
424 return cx.foreground().spawn(async move { Ok(user) });
425 }
426
427 let load_users = self.get_users(vec![user_id], cx);
428 cx.spawn(|this, mut cx| async move {
429 load_users.await?;
430 this.update(&mut cx, |this, _| {
431 this.users
432 .get(&user_id)
433 .cloned()
434 .ok_or_else(|| anyhow!("server responded with no users"))
435 })
436 })
437 }
438
439 pub fn current_user(&self) -> Option<Arc<User>> {
440 self.current_user.borrow().clone()
441 }
442
443 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
444 self.current_user.clone()
445 }
446
447 fn load_users(
448 &mut self,
449 request: impl RequestMessage<Response = UsersResponse>,
450 cx: &mut ModelContext<Self>,
451 ) -> Task<Result<Vec<Arc<User>>>> {
452 let client = self.client.clone();
453 let http = self.http.clone();
454 cx.spawn_weak(|this, mut cx| async move {
455 if let Some(rpc) = client.upgrade() {
456 let response = rpc.request(request).await.context("error loading users")?;
457 let users = future::join_all(
458 response
459 .users
460 .into_iter()
461 .map(|user| User::new(user, http.as_ref())),
462 )
463 .await;
464
465 if let Some(this) = this.upgrade(&cx) {
466 this.update(&mut cx, |this, _| {
467 for user in &users {
468 this.users.insert(user.id, user.clone());
469 }
470 });
471 }
472 Ok(users)
473 } else {
474 Ok(Vec::new())
475 }
476 })
477 }
478}
479
480impl User {
481 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
482 Arc::new(User {
483 id: message.id,
484 github_login: message.github_login,
485 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
486 })
487 }
488}
489
490impl Contact {
491 async fn from_proto(
492 contact: proto::Contact,
493 user_store: &ModelHandle<UserStore>,
494 cx: &mut AsyncAppContext,
495 ) -> Result<Self> {
496 let user = user_store
497 .update(cx, |user_store, cx| {
498 user_store.fetch_user(contact.user_id, cx)
499 })
500 .await?;
501 let mut projects = Vec::new();
502 for project in contact.projects {
503 let mut guests = Vec::new();
504 for participant_id in project.guests {
505 guests.push(
506 user_store
507 .update(cx, |user_store, cx| {
508 user_store.fetch_user(participant_id, cx)
509 })
510 .await?,
511 );
512 }
513 projects.push(ProjectMetadata {
514 id: project.id,
515 worktree_root_names: project.worktree_root_names.clone(),
516 is_shared: project.is_shared,
517 guests,
518 });
519 }
520 Ok(Self {
521 user,
522 online: contact.online,
523 projects,
524 })
525 }
526
527 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
528 self.projects
529 .iter()
530 .filter(|project| !project.worktree_root_names.is_empty())
531 }
532}
533
534async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
535 let mut response = http
536 .get(url, Default::default(), true)
537 .await
538 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
539
540 if !response.status().is_success() {
541 return Err(anyhow!("avatar request failed {:?}", response.status()));
542 }
543
544 let mut body = Vec::new();
545 response
546 .body_mut()
547 .read_to_end(&mut body)
548 .await
549 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
550 let format = image::guess_format(&body)?;
551 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
552 Ok(ImageData::new(image))
553}