1use crate::Status;
2use crate::{Client, Subscription, User, UserStore};
3use anyhow::anyhow;
4use anyhow::Result;
5use collections::HashMap;
6use collections::HashSet;
7use futures::channel::mpsc;
8use futures::Future;
9use futures::StreamExt;
10use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
11use rpc::{proto, TypedEnvelope};
12use std::sync::Arc;
13use util::ResultExt;
14
15pub type ChannelId = u64;
16pub type UserId = u64;
17
18pub struct ChannelStore {
19 channels_by_id: HashMap<ChannelId, Arc<Channel>>,
20 channel_paths: Vec<Vec<ChannelId>>,
21 channel_invitations: Vec<Arc<Channel>>,
22 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
23 channels_with_admin_privileges: HashSet<ChannelId>,
24 outgoing_invites: HashSet<(ChannelId, UserId)>,
25 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
26 client: Arc<Client>,
27 user_store: ModelHandle<UserStore>,
28 _rpc_subscription: Subscription,
29 _watch_connection_status: Task<()>,
30 _update_channels: Task<()>,
31}
32
33#[derive(Clone, Debug, PartialEq)]
34pub struct Channel {
35 pub id: ChannelId,
36 pub name: String,
37}
38
39pub struct ChannelMembership {
40 pub user: Arc<User>,
41 pub kind: proto::channel_member::Kind,
42 pub admin: bool,
43}
44
45pub enum ChannelEvent {
46 ChannelCreated(ChannelId),
47 ChannelRenamed(ChannelId),
48}
49
50impl Entity for ChannelStore {
51 type Event = ChannelEvent;
52}
53
54pub enum ChannelMemberStatus {
55 Invited,
56 Member,
57 NotMember,
58}
59
60impl ChannelStore {
61 pub fn new(
62 client: Arc<Client>,
63 user_store: ModelHandle<UserStore>,
64 cx: &mut ModelContext<Self>,
65 ) -> Self {
66 let rpc_subscription =
67 client.add_message_handler(cx.handle(), Self::handle_update_channels);
68
69 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
70 let mut connection_status = client.status();
71 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
72 while let Some(status) = connection_status.next().await {
73 if matches!(status, Status::ConnectionLost | Status::SignedOut) {
74 if let Some(this) = this.upgrade(&cx) {
75 this.update(&mut cx, |this, cx| {
76 this.channels_by_id.clear();
77 this.channel_invitations.clear();
78 this.channel_participants.clear();
79 this.channels_with_admin_privileges.clear();
80 this.channel_paths.clear();
81 this.outgoing_invites.clear();
82 cx.notify();
83 });
84 } else {
85 break;
86 }
87 }
88 }
89 });
90 Self {
91 channels_by_id: HashMap::default(),
92 channel_invitations: Vec::default(),
93 channel_paths: Vec::default(),
94 channel_participants: Default::default(),
95 channels_with_admin_privileges: Default::default(),
96 outgoing_invites: Default::default(),
97 update_channels_tx,
98 client,
99 user_store,
100 _rpc_subscription: rpc_subscription,
101 _watch_connection_status: watch_connection_status,
102 _update_channels: cx.spawn_weak(|this, mut cx| async move {
103 while let Some(update_channels) = update_channels_rx.next().await {
104 if let Some(this) = this.upgrade(&cx) {
105 let update_task = this.update(&mut cx, |this, cx| {
106 this.update_channels(update_channels, cx)
107 });
108 if let Some(update_task) = update_task {
109 update_task.await.log_err();
110 }
111 }
112 }
113 }),
114 }
115 }
116
117 pub fn has_children(&self, channel_id: ChannelId) -> bool {
118 self.channel_paths.iter().any(|path| {
119 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
120 path.len() > ix + 1
121 } else {
122 false
123 }
124 })
125 }
126
127 pub fn channel_count(&self) -> usize {
128 self.channel_paths.len()
129 }
130
131 pub fn channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
132 self.channel_paths.iter().map(move |path| {
133 let id = path.last().unwrap();
134 let channel = self.channel_for_id(*id).unwrap();
135 (path.len() - 1, channel)
136 })
137 }
138
139 pub fn channel_at_index(&self, ix: usize) -> Option<(usize, &Arc<Channel>)> {
140 let path = self.channel_paths.get(ix)?;
141 let id = path.last().unwrap();
142 let channel = self.channel_for_id(*id).unwrap();
143 Some((path.len() - 1, channel))
144 }
145
146 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
147 &self.channel_invitations
148 }
149
150 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
151 self.channels_by_id.get(&channel_id)
152 }
153
154 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
155 self.channel_paths.iter().any(|path| {
156 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
157 path[..=ix]
158 .iter()
159 .any(|id| self.channels_with_admin_privileges.contains(id))
160 } else {
161 false
162 }
163 })
164 }
165
166 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
167 self.channel_participants
168 .get(&channel_id)
169 .map_or(&[], |v| v.as_slice())
170 }
171
172 pub fn create_channel(
173 &self,
174 name: &str,
175 parent_id: Option<ChannelId>,
176 cx: &mut ModelContext<Self>,
177 ) -> Task<Result<ChannelId>> {
178 let client = self.client.clone();
179 let name = name.trim_start_matches("#").to_owned();
180 cx.spawn(|this, mut cx| async move {
181 let channel = client
182 .request(proto::CreateChannel { name, parent_id })
183 .await?
184 .channel
185 .ok_or_else(|| anyhow!("missing channel in response"))?;
186
187 let channel_id = channel.id;
188
189 this.update(&mut cx, |this, cx| {
190 let task = this.update_channels(
191 proto::UpdateChannels {
192 channels: vec![channel],
193 ..Default::default()
194 },
195 cx,
196 );
197 assert!(task.is_none());
198
199 // This event is emitted because the collab panel wants to clear the pending edit state
200 // before this frame is rendered. But we can't guarantee that the collab panel's future
201 // will resolve before this flush_effects finishes. Synchronously emitting this event
202 // ensures that the collab panel will observe this creation before the frame completes
203 cx.emit(ChannelEvent::ChannelCreated(channel_id));
204 });
205
206 Ok(channel_id)
207 })
208 }
209
210 pub fn invite_member(
211 &mut self,
212 channel_id: ChannelId,
213 user_id: UserId,
214 admin: bool,
215 cx: &mut ModelContext<Self>,
216 ) -> Task<Result<()>> {
217 if !self.outgoing_invites.insert((channel_id, user_id)) {
218 return Task::ready(Err(anyhow!("invite request already in progress")));
219 }
220
221 cx.notify();
222 let client = self.client.clone();
223 cx.spawn(|this, mut cx| async move {
224 let result = client
225 .request(proto::InviteChannelMember {
226 channel_id,
227 user_id,
228 admin,
229 })
230 .await;
231
232 this.update(&mut cx, |this, cx| {
233 this.outgoing_invites.remove(&(channel_id, user_id));
234 cx.notify();
235 });
236
237 result?;
238
239 Ok(())
240 })
241 }
242
243 pub fn remove_member(
244 &mut self,
245 channel_id: ChannelId,
246 user_id: u64,
247 cx: &mut ModelContext<Self>,
248 ) -> Task<Result<()>> {
249 if !self.outgoing_invites.insert((channel_id, user_id)) {
250 return Task::ready(Err(anyhow!("invite request already in progress")));
251 }
252
253 cx.notify();
254 let client = self.client.clone();
255 cx.spawn(|this, mut cx| async move {
256 let result = client
257 .request(proto::RemoveChannelMember {
258 channel_id,
259 user_id,
260 })
261 .await;
262
263 this.update(&mut cx, |this, cx| {
264 this.outgoing_invites.remove(&(channel_id, user_id));
265 cx.notify();
266 });
267 result?;
268 Ok(())
269 })
270 }
271
272 pub fn set_member_admin(
273 &mut self,
274 channel_id: ChannelId,
275 user_id: UserId,
276 admin: bool,
277 cx: &mut ModelContext<Self>,
278 ) -> Task<Result<()>> {
279 if !self.outgoing_invites.insert((channel_id, user_id)) {
280 return Task::ready(Err(anyhow!("member request already in progress")));
281 }
282
283 cx.notify();
284 let client = self.client.clone();
285 cx.spawn(|this, mut cx| async move {
286 let result = client
287 .request(proto::SetChannelMemberAdmin {
288 channel_id,
289 user_id,
290 admin,
291 })
292 .await;
293
294 this.update(&mut cx, |this, cx| {
295 this.outgoing_invites.remove(&(channel_id, user_id));
296 cx.notify();
297 });
298
299 result?;
300 Ok(())
301 })
302 }
303
304 pub fn rename(
305 &mut self,
306 channel_id: ChannelId,
307 new_name: &str,
308 cx: &mut ModelContext<Self>,
309 ) -> Task<Result<()>> {
310 let client = self.client.clone();
311 let name = new_name.to_string();
312 cx.spawn(|this, mut cx| async move {
313 let channel = client
314 .request(proto::RenameChannel { channel_id, name })
315 .await?
316 .channel
317 .ok_or_else(|| anyhow!("missing channel in response"))?;
318 this.update(&mut cx, |this, cx| {
319 let task = this.update_channels(
320 proto::UpdateChannels {
321 channels: vec![channel],
322 ..Default::default()
323 },
324 cx,
325 );
326 assert!(task.is_none());
327
328 // This event is emitted because the collab panel wants to clear the pending edit state
329 // before this frame is rendered. But we can't guarantee that the collab panel's future
330 // will resolve before this flush_effects finishes. Synchronously emitting this event
331 // ensures that the collab panel will observe this creation before the frame complete
332 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
333 });
334 Ok(())
335 })
336 }
337
338 pub fn respond_to_channel_invite(
339 &mut self,
340 channel_id: ChannelId,
341 accept: bool,
342 ) -> impl Future<Output = Result<()>> {
343 let client = self.client.clone();
344 async move {
345 client
346 .request(proto::RespondToChannelInvite { channel_id, accept })
347 .await?;
348 Ok(())
349 }
350 }
351
352 pub fn get_channel_member_details(
353 &self,
354 channel_id: ChannelId,
355 cx: &mut ModelContext<Self>,
356 ) -> Task<Result<Vec<ChannelMembership>>> {
357 let client = self.client.clone();
358 let user_store = self.user_store.downgrade();
359 cx.spawn(|_, mut cx| async move {
360 let response = client
361 .request(proto::GetChannelMembers { channel_id })
362 .await?;
363
364 let user_ids = response.members.iter().map(|m| m.user_id).collect();
365 let user_store = user_store
366 .upgrade(&cx)
367 .ok_or_else(|| anyhow!("user store dropped"))?;
368 let users = user_store
369 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
370 .await?;
371
372 Ok(users
373 .into_iter()
374 .zip(response.members)
375 .filter_map(|(user, member)| {
376 Some(ChannelMembership {
377 user,
378 admin: member.admin,
379 kind: proto::channel_member::Kind::from_i32(member.kind)?,
380 })
381 })
382 .collect())
383 })
384 }
385
386 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
387 let client = self.client.clone();
388 async move {
389 client.request(proto::RemoveChannel { channel_id }).await?;
390 Ok(())
391 }
392 }
393
394 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
395 false
396 }
397
398 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
399 self.outgoing_invites.contains(&(channel_id, user_id))
400 }
401
402 async fn handle_update_channels(
403 this: ModelHandle<Self>,
404 message: TypedEnvelope<proto::UpdateChannels>,
405 _: Arc<Client>,
406 mut cx: AsyncAppContext,
407 ) -> Result<()> {
408 this.update(&mut cx, |this, _| {
409 this.update_channels_tx
410 .unbounded_send(message.payload)
411 .unwrap();
412 });
413 Ok(())
414 }
415
416 pub(crate) fn update_channels(
417 &mut self,
418 payload: proto::UpdateChannels,
419 cx: &mut ModelContext<ChannelStore>,
420 ) -> Option<Task<Result<()>>> {
421 if !payload.remove_channel_invitations.is_empty() {
422 self.channel_invitations
423 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
424 }
425 for channel in payload.channel_invitations {
426 match self
427 .channel_invitations
428 .binary_search_by_key(&channel.id, |c| c.id)
429 {
430 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
431 Err(ix) => self.channel_invitations.insert(
432 ix,
433 Arc::new(Channel {
434 id: channel.id,
435 name: channel.name,
436 }),
437 ),
438 }
439 }
440
441 let channels_changed = !payload.channels.is_empty() || !payload.remove_channels.is_empty();
442 if channels_changed {
443 if !payload.remove_channels.is_empty() {
444 self.channels_by_id
445 .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
446 self.channel_participants
447 .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
448 self.channels_with_admin_privileges
449 .retain(|channel_id| !payload.remove_channels.contains(channel_id));
450 }
451
452 for channel in payload.channels {
453 if let Some(existing_channel) = self.channels_by_id.get_mut(&channel.id) {
454 // FIXME: We may be missing a path for this existing channel in certain cases
455 let existing_channel = Arc::make_mut(existing_channel);
456 existing_channel.name = channel.name;
457 continue;
458 }
459
460 self.channels_by_id.insert(
461 channel.id,
462 Arc::new(Channel {
463 id: channel.id,
464 name: channel.name,
465 }),
466 );
467
468 if let Some(parent_id) = channel.parent_id {
469 let mut ix = 0;
470 while ix < self.channel_paths.len() {
471 let path = &self.channel_paths[ix];
472 if path.ends_with(&[parent_id]) {
473 let mut new_path = path.clone();
474 new_path.push(channel.id);
475 self.channel_paths.insert(ix + 1, new_path);
476 ix += 1;
477 }
478 ix += 1;
479 }
480 } else {
481 self.channel_paths.push(vec![channel.id]);
482 }
483 }
484
485 self.channel_paths.sort_by(|a, b| {
486 let a = Self::channel_path_sorting_key(a, &self.channels_by_id);
487 let b = Self::channel_path_sorting_key(b, &self.channels_by_id);
488 a.cmp(b)
489 });
490 self.channel_paths.dedup();
491 self.channel_paths.retain(|path| {
492 path.iter()
493 .all(|channel_id| self.channels_by_id.contains_key(channel_id))
494 });
495 }
496
497 for permission in payload.channel_permissions {
498 if permission.is_admin {
499 self.channels_with_admin_privileges
500 .insert(permission.channel_id);
501 } else {
502 self.channels_with_admin_privileges
503 .remove(&permission.channel_id);
504 }
505 }
506
507 cx.notify();
508 if payload.channel_participants.is_empty() {
509 return None;
510 }
511
512 let mut all_user_ids = Vec::new();
513 let channel_participants = payload.channel_participants;
514 for entry in &channel_participants {
515 for user_id in entry.participant_user_ids.iter() {
516 if let Err(ix) = all_user_ids.binary_search(user_id) {
517 all_user_ids.insert(ix, *user_id);
518 }
519 }
520 }
521
522 let users = self
523 .user_store
524 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
525 Some(cx.spawn(|this, mut cx| async move {
526 let users = users.await?;
527
528 this.update(&mut cx, |this, cx| {
529 for entry in &channel_participants {
530 let mut participants: Vec<_> = entry
531 .participant_user_ids
532 .iter()
533 .filter_map(|user_id| {
534 users
535 .binary_search_by_key(&user_id, |user| &user.id)
536 .ok()
537 .map(|ix| users[ix].clone())
538 })
539 .collect();
540
541 participants.sort_by_key(|u| u.id);
542
543 this.channel_participants
544 .insert(entry.channel_id, participants);
545 }
546
547 cx.notify();
548 });
549 anyhow::Ok(())
550 }))
551 }
552
553 fn channel_path_sorting_key<'a>(
554 path: &'a [ChannelId],
555 channels_by_id: &'a HashMap<ChannelId, Arc<Channel>>,
556 ) -> impl 'a + Iterator<Item = Option<&'a str>> {
557 path.iter()
558 .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
559 }
560}