1use crate::{
2 http::{HttpClient, Method, Request, Url},
3 rpc::{Client, Status},
4 util::TryFutureExt,
5};
6use anyhow::{anyhow, Context, Result};
7use futures::future;
8use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
9use postage::{prelude::Stream, sink::Sink, watch};
10use std::{
11 collections::{HashMap, HashSet},
12 sync::Arc,
13};
14use zrpc::{proto, TypedEnvelope};
15
16#[derive(Debug)]
17pub struct User {
18 pub id: u64,
19 pub github_login: String,
20 pub avatar: Option<Arc<ImageData>>,
21}
22
23#[derive(Debug)]
24pub struct Collaborator {
25 pub user: Arc<User>,
26 pub worktrees: Vec<WorktreeMetadata>,
27}
28
29#[derive(Debug)]
30pub struct WorktreeMetadata {
31 pub id: u64,
32 pub root_name: String,
33 pub is_shared: bool,
34 pub guests: Vec<Arc<User>>,
35}
36
37pub struct UserStore {
38 users: HashMap<u64, Arc<User>>,
39 current_user: watch::Receiver<Option<Arc<User>>>,
40 collaborators: Arc<[Collaborator]>,
41 rpc: Arc<Client>,
42 http: Arc<dyn HttpClient>,
43 _maintain_collaborators: Task<()>,
44 _maintain_current_user: Task<()>,
45}
46
47pub enum Event {}
48
49impl Entity for UserStore {
50 type Event = Event;
51}
52
53impl UserStore {
54 pub fn new(rpc: Arc<Client>, http: Arc<dyn HttpClient>, cx: &mut ModelContext<Self>) -> Self {
55 let (mut current_user_tx, current_user_rx) = watch::channel();
56 let (mut update_collaborators_tx, mut update_collaborators_rx) =
57 watch::channel::<Option<proto::UpdateCollaborators>>();
58 let update_collaborators_subscription = rpc.subscribe(
59 cx,
60 move |_: &mut Self, msg: TypedEnvelope<proto::UpdateCollaborators>, _, _| {
61 let _ = update_collaborators_tx.blocking_send(Some(msg.payload));
62 Ok(())
63 },
64 );
65 Self {
66 users: Default::default(),
67 current_user: current_user_rx,
68 collaborators: Arc::from([]),
69 rpc: rpc.clone(),
70 http,
71 _maintain_collaborators: cx.spawn_weak(|this, mut cx| async move {
72 let _subscription = update_collaborators_subscription;
73 while let Some(message) = update_collaborators_rx.recv().await {
74 if let Some((message, this)) = message.zip(this.upgrade(&cx)) {
75 this.update(&mut cx, |this, cx| this.update_collaborators(message, cx))
76 .log_err()
77 .await;
78 }
79 }
80 }),
81 _maintain_current_user: cx.spawn_weak(|this, mut cx| async move {
82 let mut status = rpc.status();
83 while let Some(status) = status.recv().await {
84 match status {
85 Status::Connected { .. } => {
86 if let Some((this, user_id)) = this.upgrade(&cx).zip(rpc.user_id()) {
87 let user = this
88 .update(&mut cx, |this, cx| this.fetch_user(user_id, cx))
89 .log_err()
90 .await;
91 current_user_tx.send(user).await.ok();
92 }
93 }
94 Status::SignedOut => {
95 current_user_tx.send(None).await.ok();
96 }
97 _ => {}
98 }
99 }
100 }),
101 }
102 }
103
104 fn update_collaborators(
105 &mut self,
106 message: proto::UpdateCollaborators,
107 cx: &mut ModelContext<Self>,
108 ) -> Task<Result<()>> {
109 let mut user_ids = HashSet::new();
110 for collaborator in &message.collaborators {
111 user_ids.insert(collaborator.user_id);
112 user_ids.extend(
113 collaborator
114 .worktrees
115 .iter()
116 .flat_map(|w| &w.guests)
117 .copied(),
118 );
119 }
120
121 let load_users = self.load_users(user_ids.into_iter().collect(), cx);
122 cx.spawn(|this, mut cx| async move {
123 load_users.await?;
124
125 let mut collaborators = Vec::new();
126 for collaborator in message.collaborators {
127 collaborators.push(Collaborator::from_proto(collaborator, &this, &mut cx).await?);
128 }
129
130 this.update(&mut cx, |this, cx| {
131 collaborators.sort_by(|a, b| a.user.github_login.cmp(&b.user.github_login));
132 this.collaborators = collaborators.into();
133 cx.notify();
134 });
135
136 Ok(())
137 })
138 }
139
140 pub fn collaborators(&self) -> &Arc<[Collaborator]> {
141 &self.collaborators
142 }
143
144 pub fn load_users(
145 &mut self,
146 mut user_ids: Vec<u64>,
147 cx: &mut ModelContext<Self>,
148 ) -> Task<Result<()>> {
149 let rpc = self.rpc.clone();
150 let http = self.http.clone();
151 user_ids.retain(|id| !self.users.contains_key(id));
152 cx.spawn_weak(|this, mut cx| async move {
153 if !user_ids.is_empty() {
154 let response = rpc.request(proto::GetUsers { user_ids }).await?;
155 let new_users = future::join_all(
156 response
157 .users
158 .into_iter()
159 .map(|user| User::new(user, http.as_ref())),
160 )
161 .await;
162
163 if let Some(this) = this.upgrade(&cx) {
164 this.update(&mut cx, |this, _| {
165 for user in new_users {
166 this.users.insert(user.id, Arc::new(user));
167 }
168 });
169 }
170 }
171
172 Ok(())
173 })
174 }
175
176 pub fn fetch_user(
177 &mut self,
178 user_id: u64,
179 cx: &mut ModelContext<Self>,
180 ) -> Task<Result<Arc<User>>> {
181 if let Some(user) = self.users.get(&user_id).cloned() {
182 return cx.spawn_weak(|_, _| async move { Ok(user) });
183 }
184
185 let load_users = self.load_users(vec![user_id], cx);
186 cx.spawn(|this, mut cx| async move {
187 load_users.await?;
188 this.update(&mut cx, |this, _| {
189 this.users
190 .get(&user_id)
191 .cloned()
192 .ok_or_else(|| anyhow!("server responded with no users"))
193 })
194 })
195 }
196
197 pub fn current_user(&self) -> Option<Arc<User>> {
198 self.current_user.borrow().clone()
199 }
200
201 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
202 self.current_user.clone()
203 }
204}
205
206impl User {
207 async fn new(message: proto::User, http: &dyn HttpClient) -> Self {
208 User {
209 id: message.id,
210 github_login: message.github_login,
211 avatar: fetch_avatar(http, &message.avatar_url).log_err().await,
212 }
213 }
214}
215
216impl Collaborator {
217 async fn from_proto(
218 collaborator: proto::Collaborator,
219 user_store: &ModelHandle<UserStore>,
220 cx: &mut AsyncAppContext,
221 ) -> Result<Self> {
222 let user = user_store
223 .update(cx, |user_store, cx| {
224 user_store.fetch_user(collaborator.user_id, cx)
225 })
226 .await?;
227 let mut worktrees = Vec::new();
228 for worktree in collaborator.worktrees {
229 let mut guests = Vec::new();
230 for participant_id in worktree.guests {
231 guests.push(
232 user_store
233 .update(cx, |user_store, cx| {
234 user_store.fetch_user(participant_id, cx)
235 })
236 .await?,
237 );
238 }
239 worktrees.push(WorktreeMetadata {
240 id: worktree.id,
241 root_name: worktree.root_name,
242 is_shared: worktree.is_shared,
243 guests,
244 });
245 }
246 Ok(Self { user, worktrees })
247 }
248}
249
250async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result<Arc<ImageData>> {
251 let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?;
252 let mut request = Request::new(Method::Get, url);
253 request.middleware(surf::middleware::Redirect::default());
254
255 let mut response = http
256 .send(request)
257 .await
258 .map_err(|e| anyhow!("failed to send user avatar request: {}", e))?;
259 let bytes = response
260 .body_bytes()
261 .await
262 .map_err(|e| anyhow!("failed to read user avatar response body: {}", e))?;
263 let format = image::guess_format(&bytes)?;
264 let image = image::load_from_memory_with_format(&bytes, format)?.into_bgra8();
265 Ok(ImageData::new(image))
266}