1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{hash_map::Entry, HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20impl PartialEq for User {
21 fn eq(&self, other: &Self) -> bool {
22 self.id == other.id && self.github_login == other.github_login
23 }
24}
25
26#[derive(Debug)]
27pub struct Contact {
28 pub user: Arc<User>,
29 pub online: bool,
30 pub projects: Vec<ProjectMetadata>,
31}
32
33#[derive(Debug)]
34pub struct ProjectMetadata {
35 pub id: u64,
36 pub worktree_root_names: Vec<String>,
37 pub guests: Vec<Arc<User>>,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum ContactRequestStatus {
42 None,
43 RequestSent,
44 RequestReceived,
45 RequestAccepted,
46}
47
48pub struct UserStore {
49 users: HashMap<u64, Arc<User>>,
50 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
51 current_user: watch::Receiver<Option<Arc<User>>>,
52 contacts: Vec<Arc<Contact>>,
53 incoming_contact_requests: Vec<Arc<User>>,
54 outgoing_contact_requests: Vec<Arc<User>>,
55 pending_contact_requests: HashMap<u64, usize>,
56 client: Weak<Client>,
57 http: Arc<dyn HttpClient>,
58 _maintain_contacts: Task<()>,
59 _maintain_current_user: Task<()>,
60}
61
62#[derive(Clone)]
63pub struct ContactEvent {
64 pub user: Arc<User>,
65 pub kind: ContactEventKind,
66}
67
68#[derive(Clone, Copy)]
69pub enum ContactEventKind {
70 Requested,
71 Accepted,
72 Cancelled,
73}
74
75impl Entity for UserStore {
76 type Event = ContactEvent;
77}
78
79enum UpdateContacts {
80 Update(proto::UpdateContacts),
81 Clear(postage::barrier::Sender),
82}
83
84impl UserStore {
85 pub fn new(
86 client: Arc<Client>,
87 http: Arc<dyn HttpClient>,
88 cx: &mut ModelContext<Self>,
89 ) -> Self {
90 let (mut current_user_tx, current_user_rx) = watch::channel();
91 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
92 let rpc_subscription =
93 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
94 Self {
95 users: Default::default(),
96 current_user: current_user_rx,
97 contacts: Default::default(),
98 incoming_contact_requests: Default::default(),
99 outgoing_contact_requests: Default::default(),
100 client: Arc::downgrade(&client),
101 update_contacts_tx,
102 http,
103 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
104 let _subscription = rpc_subscription;
105 while let Some(message) = update_contacts_rx.next().await {
106 if let Some(this) = this.upgrade(&cx) {
107 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
108 .log_err()
109 .await;
110 }
111 }
112 }),
113 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
114 let mut status = client.status();
115 while let Some(status) = status.recv().await {
116 match status {
117 Status::Connected { .. } => {
118 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
119 let user = this
120 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
121 .log_err()
122 .await;
123 current_user_tx.send(user).await.ok();
124 }
125 }
126 Status::SignedOut => {
127 current_user_tx.send(None).await.ok();
128 if let Some(this) = this.upgrade(&cx) {
129 this.update(&mut cx, |this, _| this.clear_contacts()).await;
130 }
131 }
132 Status::ConnectionLost => {
133 if let Some(this) = this.upgrade(&cx) {
134 this.update(&mut cx, |this, _| this.clear_contacts()).await;
135 }
136 }
137 _ => {}
138 }
139 }
140 }),
141 pending_contact_requests: Default::default(),
142 }
143 }
144
145 async fn handle_update_contacts(
146 this: ModelHandle<Self>,
147 msg: TypedEnvelope<proto::UpdateContacts>,
148 _: Arc<Client>,
149 mut cx: AsyncAppContext,
150 ) -> Result<()> {
151 this.update(&mut cx, |this, _| {
152 this.update_contacts_tx
153 .unbounded_send(UpdateContacts::Update(msg.payload))
154 .unwrap();
155 });
156 Ok(())
157 }
158
159 fn update_contacts(
160 &mut self,
161 message: UpdateContacts,
162 cx: &mut ModelContext<Self>,
163 ) -> Task<Result<()>> {
164 match message {
165 UpdateContacts::Clear(barrier) => {
166 self.contacts.clear();
167 self.incoming_contact_requests.clear();
168 self.outgoing_contact_requests.clear();
169 drop(barrier);
170 Task::ready(Ok(()))
171 }
172 UpdateContacts::Update(message) => {
173 log::info!(
174 "update contacts on client {}: {:?}",
175 self.client.upgrade().unwrap().id,
176 message
177 );
178 let mut user_ids = HashSet::new();
179 for contact in &message.contacts {
180 user_ids.insert(contact.user_id);
181 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
182 }
183 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
184 user_ids.extend(message.outgoing_requests.iter());
185
186 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
187 cx.spawn(|this, mut cx| async move {
188 load_users.await?;
189
190 // Users are fetched in parallel above and cached in call to get_users
191 // No need to paralellize here
192 let mut updated_contacts = Vec::new();
193 for contact in message.contacts {
194 let should_notify = contact.should_notify;
195 updated_contacts.push((
196 Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
197 should_notify,
198 ));
199 }
200
201 let mut incoming_requests = Vec::new();
202 for request in message.incoming_requests {
203 incoming_requests.push({
204 let user = this
205 .update(&mut cx, |this, cx| {
206 this.fetch_user(request.requester_id, cx)
207 })
208 .await?;
209 (user, request.should_notify)
210 });
211 }
212
213 let mut outgoing_requests = Vec::new();
214 for requested_user_id in message.outgoing_requests {
215 outgoing_requests.push(
216 this.update(&mut cx, |this, cx| this.fetch_user(requested_user_id, cx))
217 .await?,
218 );
219 }
220
221 let removed_contacts =
222 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
223 let removed_incoming_requests =
224 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
225 let removed_outgoing_requests =
226 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
227
228 this.update(&mut cx, |this, cx| {
229 // Remove contacts
230 this.contacts
231 .retain(|contact| !removed_contacts.contains(&contact.user.id));
232 // Update existing contacts and insert new ones
233 for (updated_contact, should_notify) in updated_contacts {
234 if should_notify {
235 cx.emit(ContactEvent {
236 user: updated_contact.user.clone(),
237 kind: ContactEventKind::Accepted,
238 });
239 }
240 match this.contacts.binary_search_by_key(
241 &&updated_contact.user.github_login,
242 |contact| &contact.user.github_login,
243 ) {
244 Ok(ix) => this.contacts[ix] = updated_contact,
245 Err(ix) => this.contacts.insert(ix, updated_contact),
246 }
247 }
248
249 // Remove incoming contact requests
250 this.incoming_contact_requests.retain(|user| {
251 if removed_incoming_requests.contains(&user.id) {
252 cx.emit(ContactEvent {
253 user: user.clone(),
254 kind: ContactEventKind::Cancelled,
255 });
256 false
257 } else {
258 true
259 }
260 });
261 // Update existing incoming requests and insert new ones
262 for (user, should_notify) in incoming_requests {
263 if should_notify {
264 cx.emit(ContactEvent {
265 user: user.clone(),
266 kind: ContactEventKind::Requested,
267 });
268 }
269
270 match this
271 .incoming_contact_requests
272 .binary_search_by_key(&&user.github_login, |contact| {
273 &contact.github_login
274 }) {
275 Ok(ix) => this.incoming_contact_requests[ix] = user,
276 Err(ix) => this.incoming_contact_requests.insert(ix, user),
277 }
278 }
279
280 // Remove outgoing contact requests
281 this.outgoing_contact_requests
282 .retain(|user| !removed_outgoing_requests.contains(&user.id));
283 // Update existing incoming requests and insert new ones
284 for request in outgoing_requests {
285 match this
286 .outgoing_contact_requests
287 .binary_search_by_key(&&request.github_login, |contact| {
288 &contact.github_login
289 }) {
290 Ok(ix) => this.outgoing_contact_requests[ix] = request,
291 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
292 }
293 }
294
295 cx.notify();
296 });
297
298 Ok(())
299 })
300 }
301 }
302 }
303
304 pub fn contacts(&self) -> &[Arc<Contact>] {
305 &self.contacts
306 }
307
308 pub fn has_contact(&self, user: &Arc<User>) -> bool {
309 self.contacts
310 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
311 .is_ok()
312 }
313
314 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
315 &self.incoming_contact_requests
316 }
317
318 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
319 &self.outgoing_contact_requests
320 }
321
322 pub fn is_contact_request_pending(&self, user: &User) -> bool {
323 self.pending_contact_requests.contains_key(&user.id)
324 }
325
326 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
327 if self
328 .contacts
329 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
330 .is_ok()
331 {
332 ContactRequestStatus::RequestAccepted
333 } else if self
334 .outgoing_contact_requests
335 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
336 .is_ok()
337 {
338 ContactRequestStatus::RequestSent
339 } else if self
340 .incoming_contact_requests
341 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
342 .is_ok()
343 {
344 ContactRequestStatus::RequestReceived
345 } else {
346 ContactRequestStatus::None
347 }
348 }
349
350 pub fn request_contact(
351 &mut self,
352 responder_id: u64,
353 cx: &mut ModelContext<Self>,
354 ) -> Task<Result<()>> {
355 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
356 }
357
358 pub fn remove_contact(
359 &mut self,
360 user_id: u64,
361 cx: &mut ModelContext<Self>,
362 ) -> Task<Result<()>> {
363 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
364 }
365
366 pub fn respond_to_contact_request(
367 &mut self,
368 requester_id: u64,
369 accept: bool,
370 cx: &mut ModelContext<Self>,
371 ) -> Task<Result<()>> {
372 self.perform_contact_request(
373 requester_id,
374 proto::RespondToContactRequest {
375 requester_id,
376 response: if accept {
377 proto::ContactRequestResponse::Accept
378 } else {
379 proto::ContactRequestResponse::Decline
380 } as i32,
381 },
382 cx,
383 )
384 }
385
386 pub fn dismiss_contact_request(
387 &mut self,
388 requester_id: u64,
389 cx: &mut ModelContext<Self>,
390 ) -> Task<Result<()>> {
391 let client = self.client.upgrade();
392 cx.spawn_weak(|_, _| async move {
393 client
394 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
395 .request(proto::RespondToContactRequest {
396 requester_id,
397 response: proto::ContactRequestResponse::Dismiss as i32,
398 })
399 .await?;
400 Ok(())
401 })
402 }
403
404 fn perform_contact_request<T: RequestMessage>(
405 &mut self,
406 user_id: u64,
407 request: T,
408 cx: &mut ModelContext<Self>,
409 ) -> Task<Result<()>> {
410 let client = self.client.upgrade();
411 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
412 cx.notify();
413
414 cx.spawn(|this, mut cx| async move {
415 let response = client
416 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
417 .request(request)
418 .await;
419 this.update(&mut cx, |this, cx| {
420 if let Entry::Occupied(mut request_count) =
421 this.pending_contact_requests.entry(user_id)
422 {
423 *request_count.get_mut() -= 1;
424 if *request_count.get() == 0 {
425 request_count.remove();
426 }
427 }
428 cx.notify();
429 });
430 response?;
431 Ok(())
432 })
433 }
434
435 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
436 let (tx, mut rx) = postage::barrier::channel();
437 self.update_contacts_tx
438 .unbounded_send(UpdateContacts::Clear(tx))
439 .unwrap();
440 async move {
441 rx.recv().await;
442 }
443 }
444
445 pub fn get_users(
446 &mut self,
447 mut user_ids: Vec<u64>,
448 cx: &mut ModelContext<Self>,
449 ) -> Task<Result<()>> {
450 user_ids.retain(|id| !self.users.contains_key(id));
451 if user_ids.is_empty() {
452 Task::ready(Ok(()))
453 } else {
454 let load = self.load_users(proto::GetUsers { user_ids }, cx);
455 cx.foreground().spawn(async move {
456 load.await?;
457 Ok(())
458 })
459 }
460 }
461
462 pub fn fuzzy_search_users(
463 &mut self,
464 query: String,
465 cx: &mut ModelContext<Self>,
466 ) -> Task<Result<Vec<Arc<User>>>> {
467 self.load_users(proto::FuzzySearchUsers { query }, cx)
468 }
469
470 pub fn fetch_user(
471 &mut self,
472 user_id: u64,
473 cx: &mut ModelContext<Self>,
474 ) -> Task<Result<Arc<User>>> {
475 if let Some(user) = self.users.get(&user_id).cloned() {
476 return cx.foreground().spawn(async move { Ok(user) });
477 }
478
479 let load_users = self.get_users(vec![user_id], cx);
480 cx.spawn(|this, mut cx| async move {
481 load_users.await?;
482 this.update(&mut cx, |this, _| {
483 this.users
484 .get(&user_id)
485 .cloned()
486 .ok_or_else(|| anyhow!("server responded with no users"))
487 })
488 })
489 }
490
491 pub fn current_user(&self) -> Option<Arc<User>> {
492 self.current_user.borrow().clone()
493 }
494
495 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
496 self.current_user.clone()
497 }
498
499 fn load_users(
500 &mut self,
501 request: impl RequestMessage<Response = UsersResponse>,
502 cx: &mut ModelContext<Self>,
503 ) -> Task<Result<Vec<Arc<User>>>> {
504 let client = self.client.clone();
505 let http = self.http.clone();
506 cx.spawn_weak(|this, mut cx| async move {
507 if let Some(rpc) = client.upgrade() {
508 let response = rpc.request(request).await.context("error loading users")?;
509 let users = future::join_all(
510 response
511 .users
512 .into_iter()
513 .map(|user| User::new(user, http.as_ref())),
514 )
515 .await;
516
517 if let Some(this) = this.upgrade(&cx) {
518 this.update(&mut cx, |this, _| {
519 for user in &users {
520 this.users.insert(user.id, user.clone());
521 }
522 });
523 }
524 Ok(users)
525 } else {
526 Ok(Vec::new())
527 }
528 })
529 }
530}
531
532impl User {
533 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
534 Arc::new(User {
535 id: message.id,
536 github_login: message.github_login,
537 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
538 })
539 }
540}
541
542impl Contact {
543 async fn from_proto(
544 contact: proto::Contact,
545 user_store: &ModelHandle<UserStore>,
546 cx: &mut AsyncAppContext,
547 ) -> Result<Self> {
548 let user = user_store
549 .update(cx, |user_store, cx| {
550 user_store.fetch_user(contact.user_id, cx)
551 })
552 .await?;
553 let mut projects = Vec::new();
554 for project in contact.projects {
555 let mut guests = Vec::new();
556 for participant_id in project.guests {
557 guests.push(
558 user_store
559 .update(cx, |user_store, cx| {
560 user_store.fetch_user(participant_id, cx)
561 })
562 .await?,
563 );
564 }
565 projects.push(ProjectMetadata {
566 id: project.id,
567 worktree_root_names: project.worktree_root_names.clone(),
568 guests,
569 });
570 }
571 Ok(Self {
572 user,
573 online: contact.online,
574 projects,
575 })
576 }
577
578 pub fn non_empty_projects(&self) -> impl Iterator<Item = &ProjectMetadata> {
579 self.projects
580 .iter()
581 .filter(|project| !project.worktree_root_names.is_empty())
582 }
583}
584
585async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
586 let mut response = http
587 .get(url, Default::default(), true)
588 .await
589 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
590
591 if !response.status().is_success() {
592 return Err(anyhow!("avatar request failed {:?}", response.status()));
593 }
594
595 let mut body = Vec::new();
596 response
597 .body_mut()
598 .read_to_end(&mut body)
599 .await
600 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
601 let format = image::guess_format(&body)?;
602 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
603 Ok(ImageData::new(image))
604}