1use crate::Status;
2use crate::{Client, Subscription, User, UserStore};
3use anyhow::anyhow;
4use anyhow::Result;
5use collections::HashMap;
6use collections::HashSet;
7use futures::Future;
8use futures::StreamExt;
9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
10use rpc::{proto, TypedEnvelope};
11use std::sync::Arc;
12
13pub type ChannelId = u64;
14pub type UserId = u64;
15
16pub struct ChannelStore {
17 channels: Vec<Arc<Channel>>,
18 channel_invitations: Vec<Arc<Channel>>,
19 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
20 channels_with_admin_privileges: HashSet<ChannelId>,
21 outgoing_invites: HashSet<(ChannelId, UserId)>,
22 client: Arc<Client>,
23 user_store: ModelHandle<UserStore>,
24 _rpc_subscription: Subscription,
25 _watch_connection_status: Task<()>,
26}
27
28#[derive(Clone, Debug, PartialEq)]
29pub struct Channel {
30 pub id: ChannelId,
31 pub name: String,
32 pub parent_id: Option<ChannelId>,
33 pub depth: usize,
34}
35
36pub struct ChannelMembership {
37 pub user: Arc<User>,
38 pub kind: proto::channel_member::Kind,
39 pub admin: bool,
40}
41
42pub enum ChannelEvent {
43 ChannelCreated(ChannelId),
44 ChannelRenamed(ChannelId),
45}
46
47impl Entity for ChannelStore {
48 type Event = ChannelEvent;
49}
50
51pub enum ChannelMemberStatus {
52 Invited,
53 Member,
54 NotMember,
55}
56
57impl ChannelStore {
58 pub fn new(
59 client: Arc<Client>,
60 user_store: ModelHandle<UserStore>,
61 cx: &mut ModelContext<Self>,
62 ) -> Self {
63 let rpc_subscription =
64 client.add_message_handler(cx.handle(), Self::handle_update_channels);
65
66 let mut connection_status = client.status();
67 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
68 while let Some(status) = connection_status.next().await {
69 if matches!(status, Status::ConnectionLost | Status::SignedOut) {
70 if let Some(this) = this.upgrade(&cx) {
71 this.update(&mut cx, |this, cx| {
72 this.channels.clear();
73 this.channel_invitations.clear();
74 this.channel_participants.clear();
75 this.channels_with_admin_privileges.clear();
76 this.outgoing_invites.clear();
77 cx.notify();
78 });
79 } else {
80 break;
81 }
82 }
83 }
84 });
85 Self {
86 channels: vec![],
87 channel_invitations: vec![],
88 channel_participants: Default::default(),
89 channels_with_admin_privileges: Default::default(),
90 outgoing_invites: Default::default(),
91 client,
92 user_store,
93 _rpc_subscription: rpc_subscription,
94 _watch_connection_status: watch_connection_status,
95 }
96 }
97
98 pub fn channels(&self) -> &[Arc<Channel>] {
99 &self.channels
100 }
101
102 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
103 &self.channel_invitations
104 }
105
106 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<Arc<Channel>> {
107 self.channels.iter().find(|c| c.id == channel_id).cloned()
108 }
109
110 pub fn is_user_admin(&self, mut channel_id: ChannelId) -> bool {
111 loop {
112 if self.channels_with_admin_privileges.contains(&channel_id) {
113 return true;
114 }
115 if let Some(channel) = self.channel_for_id(channel_id) {
116 if let Some(parent_id) = channel.parent_id {
117 channel_id = parent_id;
118 continue;
119 }
120 }
121 return false;
122 }
123 }
124
125 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
126 self.channel_participants
127 .get(&channel_id)
128 .map_or(&[], |v| v.as_slice())
129 }
130
131 pub fn create_channel(
132 &self,
133 name: &str,
134 parent_id: Option<ChannelId>,
135 cx: &mut ModelContext<Self>,
136 ) -> Task<Result<ChannelId>> {
137 let client = self.client.clone();
138 let name = name.trim_start_matches("#").to_owned();
139 cx.spawn(|this, mut cx| async move {
140 let channel = client
141 .request(proto::CreateChannel { name, parent_id })
142 .await?
143 .channel
144 .ok_or_else(|| anyhow!("missing channel in response"))?;
145
146 let channel_id = channel.id;
147
148 this.update(&mut cx, |this, cx| {
149 this.update_channels(
150 proto::UpdateChannels {
151 channels: vec![channel],
152 ..Default::default()
153 },
154 cx,
155 );
156
157 // This event is emitted because the collab panel wants to clear the pending edit state
158 // before this frame is rendered. But we can't guarantee that the collab panel's future
159 // will resolve before this flush_effects finishes. Synchronously emitting this event
160 // ensures that the collab panel will observe this creation before the frame completes
161 cx.emit(ChannelEvent::ChannelCreated(channel_id));
162 });
163
164 Ok(channel_id)
165 })
166 }
167
168 pub fn invite_member(
169 &mut self,
170 channel_id: ChannelId,
171 user_id: UserId,
172 admin: bool,
173 cx: &mut ModelContext<Self>,
174 ) -> Task<Result<()>> {
175 if !self.outgoing_invites.insert((channel_id, user_id)) {
176 return Task::ready(Err(anyhow!("invite request already in progress")));
177 }
178
179 cx.notify();
180 let client = self.client.clone();
181 cx.spawn(|this, mut cx| async move {
182 let result = client
183 .request(proto::InviteChannelMember {
184 channel_id,
185 user_id,
186 admin,
187 })
188 .await;
189
190 this.update(&mut cx, |this, cx| {
191 this.outgoing_invites.remove(&(channel_id, user_id));
192 cx.notify();
193 });
194
195 result?;
196
197 Ok(())
198 })
199 }
200
201 pub fn remove_member(
202 &mut self,
203 channel_id: ChannelId,
204 user_id: u64,
205 cx: &mut ModelContext<Self>,
206 ) -> Task<Result<()>> {
207 if !self.outgoing_invites.insert((channel_id, user_id)) {
208 return Task::ready(Err(anyhow!("invite request already in progress")));
209 }
210
211 cx.notify();
212 let client = self.client.clone();
213 cx.spawn(|this, mut cx| async move {
214 let result = client
215 .request(proto::RemoveChannelMember {
216 channel_id,
217 user_id,
218 })
219 .await;
220
221 this.update(&mut cx, |this, cx| {
222 this.outgoing_invites.remove(&(channel_id, user_id));
223 cx.notify();
224 });
225 result?;
226 Ok(())
227 })
228 }
229
230 pub fn set_member_admin(
231 &mut self,
232 channel_id: ChannelId,
233 user_id: UserId,
234 admin: bool,
235 cx: &mut ModelContext<Self>,
236 ) -> Task<Result<()>> {
237 if !self.outgoing_invites.insert((channel_id, user_id)) {
238 return Task::ready(Err(anyhow!("member request already in progress")));
239 }
240
241 cx.notify();
242 let client = self.client.clone();
243 cx.spawn(|this, mut cx| async move {
244 let result = client
245 .request(proto::SetChannelMemberAdmin {
246 channel_id,
247 user_id,
248 admin,
249 })
250 .await;
251
252 this.update(&mut cx, |this, cx| {
253 this.outgoing_invites.remove(&(channel_id, user_id));
254 cx.notify();
255 });
256
257 result?;
258 Ok(())
259 })
260 }
261
262 pub fn rename(
263 &mut self,
264 channel_id: ChannelId,
265 new_name: &str,
266 cx: &mut ModelContext<Self>,
267 ) -> Task<Result<()>> {
268 let client = self.client.clone();
269 let name = new_name.to_string();
270 cx.spawn(|this, mut cx| async move {
271 let channel = client
272 .request(proto::RenameChannel { channel_id, name })
273 .await?
274 .channel
275 .ok_or_else(|| anyhow!("missing channel in response"))?;
276 this.update(&mut cx, |this, cx| {
277 this.update_channels(
278 proto::UpdateChannels {
279 channels: vec![channel],
280 ..Default::default()
281 },
282 cx,
283 );
284
285 // This event is emitted because the collab panel wants to clear the pending edit state
286 // before this frame is rendered. But we can't guarantee that the collab panel's future
287 // will resolve before this flush_effects finishes. Synchronously emitting this event
288 // ensures that the collab panel will observe this creation before the frame complete
289 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
290 });
291 Ok(())
292 })
293 }
294
295 pub fn respond_to_channel_invite(
296 &mut self,
297 channel_id: ChannelId,
298 accept: bool,
299 ) -> impl Future<Output = Result<()>> {
300 let client = self.client.clone();
301 async move {
302 client
303 .request(proto::RespondToChannelInvite { channel_id, accept })
304 .await?;
305 Ok(())
306 }
307 }
308
309 pub fn get_channel_member_details(
310 &self,
311 channel_id: ChannelId,
312 cx: &mut ModelContext<Self>,
313 ) -> Task<Result<Vec<ChannelMembership>>> {
314 let client = self.client.clone();
315 let user_store = self.user_store.downgrade();
316 cx.spawn(|_, mut cx| async move {
317 let response = client
318 .request(proto::GetChannelMembers { channel_id })
319 .await?;
320
321 let user_ids = response.members.iter().map(|m| m.user_id).collect();
322 let user_store = user_store
323 .upgrade(&cx)
324 .ok_or_else(|| anyhow!("user store dropped"))?;
325 let users = user_store
326 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
327 .await?;
328
329 Ok(users
330 .into_iter()
331 .zip(response.members)
332 .filter_map(|(user, member)| {
333 Some(ChannelMembership {
334 user,
335 admin: member.admin,
336 kind: proto::channel_member::Kind::from_i32(member.kind)?,
337 })
338 })
339 .collect())
340 })
341 }
342
343 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
344 let client = self.client.clone();
345 async move {
346 client.request(proto::RemoveChannel { channel_id }).await?;
347 Ok(())
348 }
349 }
350
351 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
352 false
353 }
354
355 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
356 self.outgoing_invites.contains(&(channel_id, user_id))
357 }
358
359 async fn handle_update_channels(
360 this: ModelHandle<Self>,
361 message: TypedEnvelope<proto::UpdateChannels>,
362 _: Arc<Client>,
363 mut cx: AsyncAppContext,
364 ) -> Result<()> {
365 this.update(&mut cx, |this, cx| {
366 this.update_channels(message.payload, cx);
367 });
368 Ok(())
369 }
370
371 pub(crate) fn update_channels(
372 &mut self,
373 payload: proto::UpdateChannels,
374 cx: &mut ModelContext<ChannelStore>,
375 ) {
376 self.channels
377 .retain(|channel| !payload.remove_channels.contains(&channel.id));
378 self.channel_invitations
379 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
380 self.channel_participants
381 .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
382 self.channels_with_admin_privileges
383 .retain(|channel_id| !payload.remove_channels.contains(channel_id));
384
385 for channel in payload.channel_invitations {
386 if let Some(existing_channel) = self
387 .channel_invitations
388 .iter_mut()
389 .find(|c| c.id == channel.id)
390 {
391 let existing_channel = Arc::make_mut(existing_channel);
392 existing_channel.name = channel.name;
393 continue;
394 }
395
396 self.channel_invitations.insert(
397 0,
398 Arc::new(Channel {
399 id: channel.id,
400 name: channel.name,
401 parent_id: None,
402 depth: 0,
403 }),
404 );
405 }
406
407 for channel in payload.channels {
408 if let Some(existing_channel) = self.channels.iter_mut().find(|c| c.id == channel.id) {
409 let existing_channel = Arc::make_mut(existing_channel);
410 existing_channel.name = channel.name;
411 continue;
412 }
413
414 if let Some(parent_id) = channel.parent_id {
415 if let Some(ix) = self.channels.iter().position(|c| c.id == parent_id) {
416 let parent_channel = &self.channels[ix];
417 let depth = parent_channel.depth + 1;
418 self.channels.insert(
419 ix + 1,
420 Arc::new(Channel {
421 id: channel.id,
422 name: channel.name,
423 parent_id: Some(parent_id),
424 depth,
425 }),
426 );
427 }
428 } else {
429 self.channels.insert(
430 0,
431 Arc::new(Channel {
432 id: channel.id,
433 name: channel.name,
434 parent_id: None,
435 depth: 0,
436 }),
437 );
438 }
439 }
440
441 for permission in payload.channel_permissions {
442 if permission.is_admin {
443 self.channels_with_admin_privileges
444 .insert(permission.channel_id);
445 } else {
446 self.channels_with_admin_privileges
447 .remove(&permission.channel_id);
448 }
449 }
450
451 let mut all_user_ids = Vec::new();
452 let channel_participants = payload.channel_participants;
453 for entry in &channel_participants {
454 for user_id in entry.participant_user_ids.iter() {
455 if let Err(ix) = all_user_ids.binary_search(user_id) {
456 all_user_ids.insert(ix, *user_id);
457 }
458 }
459 }
460
461 // TODO: Race condition if an update channels messages comes in while resolving avatars
462 let users = self
463 .user_store
464 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
465 cx.spawn(|this, mut cx| async move {
466 let users = users.await?;
467
468 this.update(&mut cx, |this, cx| {
469 for entry in &channel_participants {
470 let mut participants: Vec<_> = entry
471 .participant_user_ids
472 .iter()
473 .filter_map(|user_id| {
474 users
475 .binary_search_by_key(&user_id, |user| &user.id)
476 .ok()
477 .map(|ix| users[ix].clone())
478 })
479 .collect();
480
481 participants.sort_by_key(|u| u.id);
482
483 this.channel_participants
484 .insert(entry.channel_id, participants);
485 }
486
487 cx.notify();
488 });
489 anyhow::Ok(())
490 })
491 .detach();
492
493 cx.notify();
494 }
495}