1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{hash_map::Entry, HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20#[derive(Debug)]
21pub struct Contact {
22 pub user: Arc<User>,
23 pub online: bool,
24 pub projects: Vec<ProjectMetadata>,
25}
26
27#[derive(Debug)]
28pub struct ProjectMetadata {
29 pub id: u64,
30 pub is_shared: bool,
31 pub worktree_root_names: Vec<String>,
32 pub guests: Vec<Arc<User>>,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ContactRequestStatus {
37 None,
38 RequestSent,
39 RequestReceived,
40 RequestAccepted,
41}
42
43pub struct UserStore {
44 users: HashMap<u64, Arc<User>>,
45 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
46 current_user: watch::Receiver<Option<Arc<User>>>,
47 contacts: Vec<Arc<Contact>>,
48 incoming_contact_requests: Vec<Arc<User>>,
49 outgoing_contact_requests: Vec<Arc<User>>,
50 pending_contact_requests: HashMap<u64, usize>,
51 client: Weak<Client>,
52 http: Arc<dyn HttpClient>,
53 _maintain_contacts: Task<()>,
54 _maintain_current_user: Task<()>,
55}
56
57#[derive(Clone)]
58pub struct ContactEvent {
59 pub user: Arc<User>,
60 pub kind: ContactEventKind,
61}
62
63#[derive(Clone, Copy)]
64pub enum ContactEventKind {
65 Requested,
66 Accepted,
67 Cancelled,
68}
69
70impl Entity for UserStore {
71 type Event = ContactEvent;
72}
73
74enum UpdateContacts {
75 Update(proto::UpdateContacts),
76 Clear(postage::barrier::Sender),
77}
78
79impl UserStore {
80 pub fn new(
81 client: Arc<Client>,
82 http: Arc<dyn HttpClient>,
83 cx: &mut ModelContext<Self>,
84 ) -> Self {
85 let (mut current_user_tx, current_user_rx) = watch::channel();
86 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
87 let rpc_subscription =
88 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
89 Self {
90 users: Default::default(),
91 current_user: current_user_rx,
92 contacts: Default::default(),
93 incoming_contact_requests: Default::default(),
94 outgoing_contact_requests: Default::default(),
95 client: Arc::downgrade(&client),
96 update_contacts_tx,
97 http,
98 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
99 let _subscription = rpc_subscription;
100 while let Some(message) = update_contacts_rx.next().await {
101 if let Some(this) = this.upgrade(&cx) {
102 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
103 .log_err()
104 .await;
105 }
106 }
107 }),
108 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
109 let mut status = client.status();
110 while let Some(status) = status.recv().await {
111 match status {
112 Status::Connected { .. } => {
113 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
114 let user = this
115 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
116 .log_err()
117 .await;
118 current_user_tx.send(user).await.ok();
119 }
120 }
121 Status::SignedOut => {
122 current_user_tx.send(None).await.ok();
123 if let Some(this) = this.upgrade(&cx) {
124 this.update(&mut cx, |this, _| this.clear_contacts()).await;
125 }
126 }
127 Status::ConnectionLost => {
128 if let Some(this) = this.upgrade(&cx) {
129 this.update(&mut cx, |this, _| this.clear_contacts()).await;
130 }
131 }
132 _ => {}
133 }
134 }
135 }),
136 pending_contact_requests: Default::default(),
137 }
138 }
139
140 async fn handle_update_contacts(
141 this: ModelHandle<Self>,
142 msg: TypedEnvelope<proto::UpdateContacts>,
143 _: Arc<Client>,
144 mut cx: AsyncAppContext,
145 ) -> Result<()> {
146 this.update(&mut cx, |this, _| {
147 this.update_contacts_tx
148 .unbounded_send(UpdateContacts::Update(msg.payload))
149 .unwrap();
150 });
151 Ok(())
152 }
153
154 fn update_contacts(
155 &mut self,
156 message: UpdateContacts,
157 cx: &mut ModelContext<Self>,
158 ) -> Task<Result<()>> {
159 match message {
160 UpdateContacts::Clear(barrier) => {
161 self.contacts.clear();
162 self.incoming_contact_requests.clear();
163 self.outgoing_contact_requests.clear();
164 drop(barrier);
165 Task::ready(Ok(()))
166 }
167 UpdateContacts::Update(message) => {
168 log::info!(
169 "update contacts on client {}: {:?}",
170 self.client.upgrade().unwrap().id,
171 message
172 );
173 let mut user_ids = HashSet::new();
174 for contact in &message.contacts {
175 user_ids.insert(contact.user_id);
176 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
177 }
178 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
179 user_ids.extend(message.outgoing_requests.iter());
180
181 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
182 cx.spawn(|this, mut cx| async move {
183 load_users.await?;
184
185 // Users are fetched in parallel above and cached in call to get_users
186 // No need to paralellize here
187 let mut updated_contacts = Vec::new();
188 for contact in message.contacts {
189 let should_notify = contact.should_notify;
190 updated_contacts.push((
191 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
192 should_notify,
193 ));
194 }
195
196 let mut incoming_requests = Vec::new();
197 for request in message.incoming_requests {
198 incoming_requests.push({
199 let user = this
200 .update(&mut cx, |this, cx| {
201 this.fetch_user(request.requester_id, cx)
202 })
203 .await?;
204 (user, request.should_notify)
205 });
206 }
207
208 let mut outgoing_requests = Vec::new();
209 for requested_user_id in message.outgoing_requests {
210 outgoing_requests.push(
211 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
212 .await?,
213 );
214 }
215
216 let removed_contacts =
217 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
218 let removed_incoming_requests =
219 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
220 let removed_outgoing_requests =
221 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
222
223 this.update(&mut cx, |this, cx| {
224 // Remove contacts
225 this.contacts
226 .retain(|contact| !removed_contacts.contains(&contact.user.id));
227 // Update existing contacts and insert new ones
228 for (updated_contact, should_notify) in updated_contacts {
229 if should_notify {
230 cx.emit(ContactEvent {
231 user: updated_contact.user.clone(),
232 kind: ContactEventKind::Accepted,
233 });
234 }
235 match this.contacts.binary_search_by_key(
236 &&updated_contact.user.github_login,
237 |contact| &contact.user.github_login,
238 ) {
239 Ok(ix) => this.contacts[ix] = updated_contact,
240 Err(ix) => this.contacts.insert(ix, updated_contact),
241 }
242 }
243
244 // Remove incoming contact requests
245 this.incoming_contact_requests.retain(|user| {
246 if removed_incoming_requests.contains(&user.id) {
247 cx.emit(ContactEvent {
248 user: user.clone(),
249 kind: ContactEventKind::Cancelled,
250 });
251 false
252 } else {
253 true
254 }
255 });
256 // Update existing incoming requests and insert new ones
257 for (user, should_notify) in incoming_requests {
258 if should_notify {
259 cx.emit(ContactEvent {
260 user: user.clone(),
261 kind: ContactEventKind::Requested,
262 });
263 }
264
265 match this
266 .incoming_contact_requests
267 .binary_search_by_key(&&user.github_login, |contact| {
268 &contact.github_login
269 }) {
270 Ok(ix) => this.incoming_contact_requests[ix] = user,
271 Err(ix) => this.incoming_contact_requests.insert(ix, user),
272 }
273 }
274
275 // Remove outgoing contact requests
276 this.outgoing_contact_requests
277 .retain(|user| !removed_outgoing_requests.contains(&user.id));
278 // Update existing incoming requests and insert new ones
279 for request in outgoing_requests {
280 match this
281 .outgoing_contact_requests
282 .binary_search_by_key(&&request.github_login, |contact| {
283 &contact.github_login
284 }) {
285 Ok(ix) => this.outgoing_contact_requests[ix] = request,
286 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
287 }
288 }
289
290 cx.notify();
291 });
292
293 Ok(())
294 })
295 }
296 }
297 }
298
299 pub fn contacts(&self) -> &[Arc<Contact>] {
300 &self.contacts
301 }
302
303 pub fn has_contact(&self, user: &Arc<User>) -> bool {
304 self.contacts
305 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
306 .is_ok()
307 }
308
309 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
310 &self.incoming_contact_requests
311 }
312
313 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
314 &self.outgoing_contact_requests
315 }
316
317 pub fn is_contact_request_pending(&self, user: &User) -> bool {
318 self.pending_contact_requests.contains_key(&user.id)
319 }
320
321 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
322 if self
323 .contacts
324 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
325 .is_ok()
326 {
327 ContactRequestStatus::RequestAccepted
328 } else if self
329 .outgoing_contact_requests
330 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
331 .is_ok()
332 {
333 ContactRequestStatus::RequestSent
334 } else if self
335 .incoming_contact_requests
336 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
337 .is_ok()
338 {
339 ContactRequestStatus::RequestReceived
340 } else {
341 ContactRequestStatus::None
342 }
343 }
344
345 pub fn request_contact(
346 &mut self,
347 responder_id: u64,
348 cx: &mut ModelContext<Self>,
349 ) -> Task<Result<()>> {
350 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
351 }
352
353 pub fn remove_contact(
354 &mut self,
355 user_id: u64,
356 cx: &mut ModelContext<Self>,
357 ) -> Task<Result<()>> {
358 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
359 }
360
361 pub fn respond_to_contact_request(
362 &mut self,
363 requester_id: u64,
364 accept: bool,
365 cx: &mut ModelContext<Self>,
366 ) -> Task<Result<()>> {
367 self.perform_contact_request(
368 requester_id,
369 proto::RespondToContactRequest {
370 requester_id,
371 response: if accept {
372 proto::ContactRequestResponse::Accept
373 } else {
374 proto::ContactRequestResponse::Decline
375 } as i32,
376 },
377 cx,
378 )
379 }
380
381 pub fn dismiss_contact_request(
382 &mut self,
383 requester_id: u64,
384 cx: &mut ModelContext<Self>,
385 ) -> Task<Result<()>> {
386 let client = self.client.upgrade();
387 cx.spawn_weak(|_, _| async move {
388 client
389 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
390 .request(proto::RespondToContactRequest {
391 requester_id,
392 response: proto::ContactRequestResponse::Dismiss as i32,
393 })
394 .await?;
395 Ok(())
396 })
397 }
398
399 fn perform_contact_request<T: RequestMessage>(
400 &mut self,
401 user_id: u64,
402 request: T,
403 cx: &mut ModelContext<Self>,
404 ) -> Task<Result<()>> {
405 let client = self.client.upgrade();
406 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
407 cx.notify();
408
409 cx.spawn(|this, mut cx| async move {
410 let response = client
411 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
412 .request(request)
413 .await;
414 this.update(&mut cx, |this, cx| {
415 if let Entry::Occupied(mut request_count) =
416 this.pending_contact_requests.entry(user_id)
417 {
418 *request_count.get_mut() -= 1;
419 if *request_count.get() == 0 {
420 request_count.remove();
421 }
422 }
423 cx.notify();
424 });
425 response?;
426 Ok(())
427 })
428 }
429
430 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
431 let (tx, mut rx) = postage::barrier::channel();
432 self.update_contacts_tx
433 .unbounded_send(UpdateContacts::Clear(tx))
434 .unwrap();
435 async move {
436 rx.recv().await;
437 }
438 }
439
440 pub fn get_users(
441 &mut self,
442 mut user_ids: Vec<u64>,
443 cx: &mut ModelContext<Self>,
444 ) -> Task<Result<()>> {
445 user_ids.retain(|id| !self.users.contains_key(id));
446 if user_ids.is_empty() {
447 Task::ready(Ok(()))
448 } else {
449 let load = self.load_users(proto::GetUsers { user_ids }, cx);
450 cx.foreground().spawn(async move {
451 load.await?;
452 Ok(())
453 })
454 }
455 }
456
457 pub fn fuzzy_search_users(
458 &mut self,
459 query: String,
460 cx: &mut ModelContext<Self>,
461 ) -> Task<Result<Vec<Arc<User>>>> {
462 self.load_users(proto::FuzzySearchUsers { query }, cx)
463 }
464
465 pub fn fetch_user(
466 &mut self,
467 user_id: u64,
468 cx: &mut ModelContext<Self>,
469 ) -> Task<Result<Arc<User>>> {
470 if let Some(user) = self.users.get(&user_id).cloned() {
471 return cx.foreground().spawn(async move { Ok(user) });
472 }
473
474 let load_users = self.get_users(vec![user_id], cx);
475 cx.spawn(|this, mut cx| async move {
476 load_users.await?;
477 this.update(&mut cx, |this, _| {
478 this.users
479 .get(&user_id)
480 .cloned()
481 .ok_or_else(|| anyhow!("server responded with no users"))
482 })
483 })
484 }
485
486 pub fn current_user(&self) -> Option<Arc<User>> {
487 self.current_user.borrow().clone()
488 }
489
490 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
491 self.current_user.clone()
492 }
493
494 fn load_users(
495 &mut self,
496 request: impl RequestMessage<Response = UsersResponse>,
497 cx: &mut ModelContext<Self>,
498 ) -> Task<Result<Vec<Arc<User>>>> {
499 let client = self.client.clone();
500 let http = self.http.clone();
501 cx.spawn_weak(|this, mut cx| async move {
502 if let Some(rpc) = client.upgrade() {
503 let response = rpc.request(request).await.context("error loading users")?;
504 let users = future::join_all(
505 response
506 .users
507 .into_iter()
508 .map(|user| User::new(user, http.as_ref())),
509 )
510 .await;
511
512 if let Some(this) = this.upgrade(&cx) {
513 this.update(&mut cx, |this, _| {
514 for user in &users {
515 this.users.insert(user.id, user.clone());
516 }
517 });
518 }
519 Ok(users)
520 } else {
521 Ok(Vec::new())
522 }
523 })
524 }
525}
526
527impl User {
528 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
529 Arc::new(User {
530 id: message.id,
531 github_login: message.github_login,
532 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
533 })
534 }
535}
536
537impl Contact {
538 async fn from_proto(
539 contact: proto::Contact,
540 user_store: &ModelHandle<UserStore>,
541 cx: &mut AsyncAppContext,
542 ) -> Result<Self> {
543 let user = user_store
544 .update(cx, |user_store, cx| {
545 user_store.fetch_user(contact.user_id, cx)
546 })
547 .await?;
548 let mut projects = Vec::new();
549 for project in contact.projects {
550 let mut guests = Vec::new();
551 for participant_id in project.guests {
552 guests.push(
553 user_store
554 .update(cx, |user_store, cx| {
555 user_store.fetch_user(participant_id, cx)
556 })
557 .await?,
558 );
559 }
560 projects.push(ProjectMetadata {
561 id: project.id,
562 worktree_root_names: project.worktree_root_names.clone(),
563 is_shared: project.is_shared,
564 guests,
565 });
566 }
567 Ok(Self {
568 user,
569 online: contact.online,
570 projects,
571 })
572 }
573
574 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
575 self.projects
576 .iter()
577 .filter(|project| !project.worktree_root_names.is_empty())
578 }
579}
580
581async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
582 let mut response = http
583 .get(url, Default::default(), true)
584 .await
585 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
586
587 if !response.status().is_success() {
588 return Err(anyhow!("avatar request failed {:?}", response.status()));
589 }
590
591 let mut body = Vec::new();
592 response
593 .body_mut()
594 .read_to_end(&mut body)
595 .await
596 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
597 let format = image::guess_format(&body)?;
598 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
599 Ok(ImageData::new(image))
600}