1use super::{http::HttpClient, proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use futures::{future, AsyncReadExt};
4use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
5use postage::{prelude::Stream, sink::Sink, watch};
6use rpc::proto::{RequestMessage, UsersResponse};
7use std::{
8 collections::{HashMap, HashSet},
9 sync::{Arc, Weak},
10};
11use util::TryFutureExt as _;
12
13#[derive(Debug)]
14pub struct User {
15 pub id: u64,
16 pub github_login: String,
17 pub avatar: Option<Arc<ImageData>>,
18}
19
20#[derive(Debug)]
21pub struct Contact {
22 pub user: Arc<User>,
23 pub projects: Vec<ProjectMetadata>,
24}
25
26#[derive(Debug)]
27pub struct ProjectMetadata {
28 pub id: u64,
29 pub is_shared: bool,
30 pub worktree_root_names: Vec<String>,
31 pub guests: Vec<Arc<User>>,
32}
33
34pub struct UserStore {
35 users: HashMap<u64, Arc<User>>,
36 update_contacts_tx: watch::Sender<Option<proto::UpdateContacts>>,
37 current_user: watch::Receiver<Option<Arc<User>>>,
38 contacts: Arc<[Contact]>,
39 client: Weak<Client>,
40 http: Arc<dyn HttpClient>,
41 _maintain_contacts: Task<()>,
42 _maintain_current_user: Task<()>,
43}
44
45pub enum Event {}
46
47impl Entity for UserStore {
48 type Event = Event;
49}
50
51impl UserStore {
52 pub fn new(
53 client: Arc<Client>,
54 http: Arc<dyn HttpClient>,
55 cx: &mut ModelContext<Self>,
56 ) -> Self {
57 let (mut current_user_tx, current_user_rx) = watch::channel();
58 let (update_contacts_tx, mut update_contacts_rx) =
59 watch::channel::<Option<proto::UpdateContacts>>();
60 let rpc_subscription =
61 client.add_message_handler(cx.handle(), Self::handle_update_contacts);
62 Self {
63 users: Default::default(),
64 current_user: current_user_rx,
65 contacts: Arc::from([]),
66 client: Arc::downgrade(&client),
67 update_contacts_tx,
68 http,
69 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
70 let _subscription = rpc_subscription;
71 while let Some(message) = update_contacts_rx.recv().await {
72 if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
73 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
74 .log_err()
75 .await;
76 }
77 }
78 }),
79 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
80 let mut status = client.status();
81 while let Some(status) = status.recv().await {
82 match status {
83 Status::Connected { .. } => {
84 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
85 let user = this
86 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
87 .log_err()
88 .await;
89 current_user_tx.send(user).await.ok();
90 }
91 }
92 Status::SignedOut => {
93 current_user_tx.send(None).await.ok();
94 }
95 _ => {}
96 }
97 }
98 }),
99 }
100 }
101
102 async fn handle_update_contacts(
103 this: ModelHandle<Self>,
104 msg: TypedEnvelope<proto::UpdateContacts>,
105 _: Arc<Client>,
106 mut cx: AsyncAppContext,
107 ) -> Result<()> {
108 this.update(&mut cx, |this, _| {
109 *this.update_contacts_tx.borrow_mut() = Some(msg.payload);
110 });
111 Ok(())
112 }
113
114 fn update_contacts(
115 &mut self,
116 message: proto::UpdateContacts,
117 cx: &mut ModelContext<Self>,
118 ) -> Task<Result<()>> {
119 let mut user_ids = HashSet::new();
120 for contact in &message.contacts {
121 user_ids.insert(contact.user_id);
122 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
123 }
124
125 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
126 cx.spawn(|this, mut cx| async move {
127 load_users.await?;
128
129 let mut contacts = Vec::new();
130 for contact in message.contacts {
131 contacts.push(Contact::from_proto(contact, &this, &mut cx).await?);
132 }
133
134 this.update(&mut cx, |this, cx| {
135 contacts.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
136 this.contacts = contacts.into();
137 cx.notify();
138 });
139
140 Ok(())
141 })
142 }
143
144 pub fn contacts(&self) -> &Arc<[Contact]> {
145 &self.contacts
146 }
147
148 pub fn has_contact(&self, user: &Arc<User>) -> bool {
149 self.contacts
150 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
151 .is_ok()
152 }
153
154 pub fn get_users(
155 &mut self,
156 mut user_ids: Vec<u64>,
157 cx: &mut ModelContext<Self>,
158 ) -> Task<Result<Vec<Arc<User>>>> {
159 user_ids.retain(|id| !self.users.contains_key(id));
160 self.load_users(proto::GetUsers { user_ids }, cx)
161 }
162
163 pub fn fuzzy_search_users(
164 &mut self,
165 query: String,
166 cx: &mut ModelContext<Self>,
167 ) -> Task<Result<Vec<Arc<User>>>> {
168 self.load_users(proto::FuzzySearchUsers { query }, cx)
169 }
170
171 pub fn fetch_user(
172 &mut self,
173 user_id: u64,
174 cx: &mut ModelContext<Self>,
175 ) -> Task<Result<Arc<User>>> {
176 if let Some(user) = self.users.get(&user_id).cloned() {
177 return cx.foreground().spawn(async move { Ok(user) });
178 }
179
180 let load_users = self.get_users(vec![user_id], cx);
181 cx.spawn(|this, mut cx| async move {
182 load_users.await?;
183 this.update(&mut cx, |this, _| {
184 this.users
185 .get(&user_id)
186 .cloned()
187 .ok_or_else(|| anyhow!("server responded with no users"))
188 })
189 })
190 }
191
192 pub fn current_user(&self) -> Option<Arc<User>> {
193 self.current_user.borrow().clone()
194 }
195
196 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
197 self.current_user.clone()
198 }
199
200 fn load_users(
201 &mut self,
202 request: impl RequestMessage<Response = UsersResponse>,
203 cx: &mut ModelContext<Self>,
204 ) -> Task<Result<Vec<Arc<User>>>> {
205 let client = self.client.clone();
206 let http = self.http.clone();
207 cx.spawn_weak(|this, mut cx| async move {
208 if let Some(rpc) = client.upgrade() {
209 let response = rpc.request(request).await?;
210 let users = future::join_all(
211 response
212 .users
213 .into_iter()
214 .map(|user| User::new(user, http.as_ref())),
215 )
216 .await;
217
218 if let Some(this) = this.upgrade(&cx) {
219 this.update(&mut cx, |this, _| {
220 for user in &users {
221 this.users.insert(user.id, user.clone());
222 }
223 });
224 }
225 Ok(users)
226 } else {
227 Ok(Vec::new())
228 }
229 })
230 }
231}
232
233impl User {
234 async fn new(message: proto::User, http: &dyn HttpClient) -> Arc<Self> {
235 Arc::new(User {
236 id: message.id,
237 github_login: message.github_login,
238 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
239 })
240 }
241}
242
243impl Contact {
244 async fn from_proto(
245 contact: proto::Contact,
246 user_store: &ModelHandle<UserStore>,
247 cx: &mut AsyncAppContext,
248 ) -> Result<Self> {
249 let user = user_store
250 .update(cx, |user_store, cx| {
251 user_store.fetch_user(contact.user_id, cx)
252 })
253 .await?;
254 let mut projects = Vec::new();
255 for project in contact.projects {
256 let mut guests = Vec::new();
257 for participant_id in project.guests {
258 guests.push(
259 user_store
260 .update(cx, |user_store, cx| {
261 user_store.fetch_user(participant_id, cx)
262 })
263 .await?,
264 );
265 }
266 projects.push(ProjectMetadata {
267 id: project.id,
268 worktree_root_names: project.worktree_root_names.clone(),
269 is_shared: project.is_shared,
270 guests,
271 });
272 }
273 Ok(Self { user, projects })
274 }
275}
276
277async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
278 let mut response = http
279 .get(url, Default::default(), true)
280 .await
281 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
282
283 if !response.status().is_success() {
284 return Err(anyhow!("avatar request failed {:?}", response.status()));
285 }
286
287 let mut body = Vec::new();
288 response
289 .body_mut()
290 .read_to_end(&mut body)
291 .await
292 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
293 let format = image::guess_format(&body)?;
294 let image = image::load_from_memory_with_format(&body, format)?.into_bgra8();
295 Ok(ImageData::new(image))
296}