1use super::{
2 http::{HttpClient, Method, Request, Url},
3 proto, Client, Status, TypedEnvelope,
4};
5use anyhow::{anyhow, Context, Result};
6use futures::future;
7use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
8use postage::{prelude::Stream, sink::Sink, watch};
9use std::{
10 collections::{HashMap, HashSet},
11 sync::Arc,
12};
13use util::TryFutureExt as _;
14
15#[derive(Debug)]
16pub struct User {
17 pub id: u64,
18 pub github_login: String,
19 pub avatar: Option<Arc<ImageData>>,
20}
21
22#[derive(Debug)]
23pub struct Contact {
24 pub user: Arc<User>,
25 pub projects: Vec<ProjectMetadata>,
26}
27
28#[derive(Debug)]
29pub struct ProjectMetadata {
30 pub id: u64,
31 pub is_shared: bool,
32 pub worktree_root_names: Vec<String>,
33 pub guests: Vec<Arc<User>>,
34}
35
36pub struct UserStore {
37 users: HashMap<u64, Arc<User>>,
38 current_user: watch::Receiver<Option<Arc<User>>>,
39 contacts: Arc<[Contact]>,
40 client: Arc<Client>,
41 http: Arc<dyn HttpClient>,
42 _maintain_contacts: Task<()>,
43 _maintain_current_user: Task<()>,
44}
45
46pub enum Event {}
47
48impl Entity for UserStore {
49 type Event = Event;
50}
51
52impl UserStore {
53 pub fn new(
54 client: Arc<Client>,
55 http: Arc<dyn HttpClient>,
56 cx: &mut ModelContext<Self>,
57 ) -> Self {
58 let (mut current_user_tx, current_user_rx) = watch::channel();
59 let (mut update_contacts_tx, mut update_contacts_rx) =
60 watch::channel::<Option<proto::UpdateContacts>>();
61 let update_contacts_subscription = client.add_message_handler(
62 cx,
63 move |_: ModelHandle<Self>, msg: TypedEnvelope<proto::UpdateContacts>, _, _| {
64 *update_contacts_tx.borrow_mut() = Some(msg.payload);
65 async move { Ok(()) }
66 },
67 );
68 Self {
69 users: Default::default(),
70 current_user: current_user_rx,
71 contacts: Arc::from([]),
72 client: client.clone(),
73 http,
74 _maintain_contacts: cx.spawn_weak(|this, mut cx| async move {
75 let _subscription = update_contacts_subscription;
76 while let Some(message) = update_contacts_rx.recv().await {
77 if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
78 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
79 .log_err()
80 .await;
81 }
82 }
83 }),
84 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
85 let mut status = client.status();
86 while let Some(status) = status.recv().await {
87 match status {
88 Status::Connected { .. } => {
89 if let Some((this, user_id)) = this.upgrade(&cx).zip(client.user_id()) {
90 let user = this
91 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
92 .log_err()
93 .await;
94 current_user_tx.send(user).await.ok();
95 }
96 }
97 Status::SignedOut => {
98 current_user_tx.send(None).await.ok();
99 }
100 _ => {}
101 }
102 }
103 }),
104 }
105 }
106
107 fn update_contacts(
108 &mut self,
109 message: proto::UpdateContacts,
110 cx: &mut ModelContext<Self>,
111 ) -> Task<Result<()>> {
112 let mut user_ids = HashSet::new();
113 for contact in &message.contacts {
114 user_ids.insert(contact.user_id);
115 user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied());
116 }
117
118 let load_users = self.load_users(user_ids.into_iter().collect(), cx);
119 cx.spawn(|this, mut cx| async move {
120 load_users.await?;
121
122 let mut contacts = Vec::new();
123 for contact in message.contacts {
124 contacts.push(Contact::from_proto(contact, &this, &mut cx).await?);
125 }
126
127 this.update(&mut cx, |this, cx| {
128 contacts.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
129 this.contacts = contacts.into();
130 cx.notify();
131 });
132
133 Ok(())
134 })
135 }
136
137 pub fn contacts(&self) -> &Arc<[Contact]> {
138 &self.contacts
139 }
140
141 pub fn load_users(
142 &mut self,
143 mut user_ids: Vec<u64>,
144 cx: &mut ModelContext<Self>,
145 ) -> Task<Result<()>> {
146 let rpc = self.client.clone();
147 let http = self.http.clone();
148 user_ids.retain(|id| !self.users.contains_key(id));
149 cx.spawn_weak(|this, mut cx| async move {
150 if !user_ids.is_empty() {
151 let response = rpc.request(proto::GetUsers { user_ids }).await?;
152 let new_users = future::join_all(
153 response
154 .users
155 .into_iter()
156 .map(|user| User::new(user, http.as_ref())),
157 )
158 .await;
159
160 if let Some(this) = this.upgrade(&cx) {
161 this.update(&mut cx, |this, _| {
162 for user in new_users {
163 this.users.insert(user.id, Arc::new(user));
164 }
165 });
166 }
167 }
168
169 Ok(())
170 })
171 }
172
173 pub fn fetch_user(
174 &mut self,
175 user_id: u64,
176 cx: &mut ModelContext<Self>,
177 ) -> Task<Result<Arc<User>>> {
178 if let Some(user) = self.users.get(&user_id).cloned() {
179 return cx.spawn_weak(|_, _| async move { Ok(user) });
180 }
181
182 let load_users = self.load_users(vec![user_id], cx);
183 cx.spawn(|this, mut cx| async move {
184 load_users.await?;
185 this.update(&mut cx, |this, _| {
186 this.users
187 .get(&user_id)
188 .cloned()
189 .ok_or_else(|| anyhow!("server responded with no users"))
190 })
191 })
192 }
193
194 pub fn current_user(&self) -> Option<Arc<User>> {
195 self.current_user.borrow().clone()
196 }
197
198 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
199 self.current_user.clone()
200 }
201}
202
203impl User {
204 async fn new(message: proto::User, http: &dyn HttpClient) -> Self {
205 User {
206 id: message.id,
207 github_login: message.github_login,
208 avatar: fetch_avatar(http, &message.avatar_url).warn_on_err().await,
209 }
210 }
211}
212
213impl Contact {
214 async fn from_proto(
215 contact: proto::Contact,
216 user_store: &ModelHandle<UserStore>,
217 cx: &mut AsyncAppContext,
218 ) -> Result<Self> {
219 let user = user_store
220 .update(cx, |user_store, cx| {
221 user_store.fetch_user(contact.user_id, cx)
222 })
223 .await?;
224 let mut projects = Vec::new();
225 for project in contact.projects {
226 let mut guests = Vec::new();
227 for participant_id in project.guests {
228 guests.push(
229 user_store
230 .update(cx, |user_store, cx| {
231 user_store.fetch_user(participant_id, cx)
232 })
233 .await?,
234 );
235 }
236 projects.push(ProjectMetadata {
237 id: project.id,
238 worktree_root_names: project.worktree_root_names.clone(),
239 is_shared: project.is_shared,
240 guests,
241 });
242 }
243 Ok(Self { user, projects })
244 }
245}
246
247async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
248 let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
249 let mut request = Request::new(Method::Get, url);
250 request.middleware(surf::middleware::Redirect::default());
251
252 let mut response = http
253 .send(request)
254 .await
255 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
256 if !response.status().is_success() {
257 return Err(anyhow!("avatar request failed {:?}", response.status()));
258 }
259 let bytes = response
260 .body_bytes()
261 .await
262 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
263 let format = image::guess_format(&bytes)?;
264 let image = image::load_from_memory_with_format(&bytes, format)?.into_bgra8();
265 Ok(ImageData::new(image))
266}