1use crate::Status;
2use crate::{Client, Subscription, User, UserStore};
3use anyhow::anyhow;
4use anyhow::Result;
5use collections::HashMap;
6use collections::HashSet;
7use futures::channel::mpsc;
8use futures::Future;
9use futures::StreamExt;
10use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
11use rpc::{proto, TypedEnvelope};
12use std::sync::Arc;
13use util::ResultExt;
14
15pub type ChannelId = u64;
16pub type UserId = u64;
17
18pub struct ChannelStore {
19 channels_by_id: HashMap<ChannelId, Arc<Channel>>,
20 channel_paths: Vec<Vec<ChannelId>>,
21 channel_invitations: Vec<Arc<Channel>>,
22 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
23 channels_with_admin_privileges: HashSet<ChannelId>,
24 outgoing_invites: HashSet<(ChannelId, UserId)>,
25 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
26 client: Arc<Client>,
27 user_store: ModelHandle<UserStore>,
28 _rpc_subscription: Subscription,
29 _watch_connection_status: Task<()>,
30 _update_channels: Task<()>,
31}
32
33#[derive(Clone, Debug, PartialEq)]
34pub struct Channel {
35 pub id: ChannelId,
36 pub name: String,
37}
38
39pub struct ChannelMembership {
40 pub user: Arc<User>,
41 pub kind: proto::channel_member::Kind,
42 pub admin: bool,
43}
44
45pub enum ChannelEvent {
46 ChannelCreated(ChannelId),
47 ChannelRenamed(ChannelId),
48}
49
50impl Entity for ChannelStore {
51 type Event = ChannelEvent;
52}
53
54pub enum ChannelMemberStatus {
55 Invited,
56 Member,
57 NotMember,
58}
59
60impl ChannelStore {
61 pub fn new(
62 client: Arc<Client>,
63 user_store: ModelHandle<UserStore>,
64 cx: &mut ModelContext<Self>,
65 ) -> Self {
66 let rpc_subscription =
67 client.add_message_handler(cx.handle(), Self::handle_update_channels);
68
69 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
70 let mut connection_status = client.status();
71 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
72 while let Some(status) = connection_status.next().await {
73 if matches!(status, Status::ConnectionLost | Status::SignedOut) {
74 if let Some(this) = this.upgrade(&cx) {
75 this.update(&mut cx, |this, cx| {
76 this.channels_by_id.clear();
77 this.channel_invitations.clear();
78 this.channel_participants.clear();
79 this.channels_with_admin_privileges.clear();
80 this.channel_paths.clear();
81 this.outgoing_invites.clear();
82 cx.notify();
83 });
84 } else {
85 break;
86 }
87 }
88 }
89 });
90 Self {
91 channels_by_id: HashMap::default(),
92 channel_invitations: Vec::default(),
93 channel_paths: Vec::default(),
94 channel_participants: Default::default(),
95 channels_with_admin_privileges: Default::default(),
96 outgoing_invites: Default::default(),
97 update_channels_tx,
98 client,
99 user_store,
100 _rpc_subscription: rpc_subscription,
101 _watch_connection_status: watch_connection_status,
102 _update_channels: cx.spawn_weak(|this, mut cx| async move {
103 while let Some(update_channels) = update_channels_rx.next().await {
104 if let Some(this) = this.upgrade(&cx) {
105 let update_task = this.update(&mut cx, |this, cx| {
106 this.update_channels(update_channels, cx)
107 });
108 if let Some(update_task) = update_task {
109 update_task.await.log_err();
110 }
111 }
112 }
113 }),
114 }
115 }
116
117 pub fn channel_count(&self) -> usize {
118 self.channel_paths.len()
119 }
120
121 pub fn channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
122 self.channel_paths.iter().map(move |path| {
123 let id = path.last().unwrap();
124 let channel = self.channel_for_id(*id).unwrap();
125 (path.len() - 1, channel)
126 })
127 }
128
129 pub fn channel_at_index(&self, ix: usize) -> Option<(usize, &Arc<Channel>)> {
130 let path = self.channel_paths.get(ix)?;
131 let id = path.last().unwrap();
132 let channel = self.channel_for_id(*id).unwrap();
133 Some((path.len() - 1, channel))
134 }
135
136 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
137 &self.channel_invitations
138 }
139
140 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
141 self.channels_by_id.get(&channel_id)
142 }
143
144 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
145 self.channel_paths.iter().any(|path| {
146 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
147 path[..=ix]
148 .iter()
149 .any(|id| self.channels_with_admin_privileges.contains(id))
150 } else {
151 false
152 }
153 })
154 }
155
156 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
157 self.channel_participants
158 .get(&channel_id)
159 .map_or(&[], |v| v.as_slice())
160 }
161
162 pub fn create_channel(
163 &self,
164 name: &str,
165 parent_id: Option<ChannelId>,
166 cx: &mut ModelContext<Self>,
167 ) -> Task<Result<ChannelId>> {
168 let client = self.client.clone();
169 let name = name.trim_start_matches("#").to_owned();
170 cx.spawn(|this, mut cx| async move {
171 let channel = client
172 .request(proto::CreateChannel { name, parent_id })
173 .await?
174 .channel
175 .ok_or_else(|| anyhow!("missing channel in response"))?;
176
177 let channel_id = channel.id;
178
179 this.update(&mut cx, |this, cx| {
180 let task = this.update_channels(
181 proto::UpdateChannels {
182 channels: vec![channel],
183 ..Default::default()
184 },
185 cx,
186 );
187 assert!(task.is_none());
188
189 // This event is emitted because the collab panel wants to clear the pending edit state
190 // before this frame is rendered. But we can't guarantee that the collab panel's future
191 // will resolve before this flush_effects finishes. Synchronously emitting this event
192 // ensures that the collab panel will observe this creation before the frame completes
193 cx.emit(ChannelEvent::ChannelCreated(channel_id));
194 });
195
196 Ok(channel_id)
197 })
198 }
199
200 pub fn invite_member(
201 &mut self,
202 channel_id: ChannelId,
203 user_id: UserId,
204 admin: bool,
205 cx: &mut ModelContext<Self>,
206 ) -> Task<Result<()>> {
207 if !self.outgoing_invites.insert((channel_id, user_id)) {
208 return Task::ready(Err(anyhow!("invite request already in progress")));
209 }
210
211 cx.notify();
212 let client = self.client.clone();
213 cx.spawn(|this, mut cx| async move {
214 let result = client
215 .request(proto::InviteChannelMember {
216 channel_id,
217 user_id,
218 admin,
219 })
220 .await;
221
222 this.update(&mut cx, |this, cx| {
223 this.outgoing_invites.remove(&(channel_id, user_id));
224 cx.notify();
225 });
226
227 result?;
228
229 Ok(())
230 })
231 }
232
233 pub fn remove_member(
234 &mut self,
235 channel_id: ChannelId,
236 user_id: u64,
237 cx: &mut ModelContext<Self>,
238 ) -> Task<Result<()>> {
239 if !self.outgoing_invites.insert((channel_id, user_id)) {
240 return Task::ready(Err(anyhow!("invite request already in progress")));
241 }
242
243 cx.notify();
244 let client = self.client.clone();
245 cx.spawn(|this, mut cx| async move {
246 let result = client
247 .request(proto::RemoveChannelMember {
248 channel_id,
249 user_id,
250 })
251 .await;
252
253 this.update(&mut cx, |this, cx| {
254 this.outgoing_invites.remove(&(channel_id, user_id));
255 cx.notify();
256 });
257 result?;
258 Ok(())
259 })
260 }
261
262 pub fn set_member_admin(
263 &mut self,
264 channel_id: ChannelId,
265 user_id: UserId,
266 admin: bool,
267 cx: &mut ModelContext<Self>,
268 ) -> Task<Result<()>> {
269 if !self.outgoing_invites.insert((channel_id, user_id)) {
270 return Task::ready(Err(anyhow!("member request already in progress")));
271 }
272
273 cx.notify();
274 let client = self.client.clone();
275 cx.spawn(|this, mut cx| async move {
276 let result = client
277 .request(proto::SetChannelMemberAdmin {
278 channel_id,
279 user_id,
280 admin,
281 })
282 .await;
283
284 this.update(&mut cx, |this, cx| {
285 this.outgoing_invites.remove(&(channel_id, user_id));
286 cx.notify();
287 });
288
289 result?;
290 Ok(())
291 })
292 }
293
294 pub fn rename(
295 &mut self,
296 channel_id: ChannelId,
297 new_name: &str,
298 cx: &mut ModelContext<Self>,
299 ) -> Task<Result<()>> {
300 let client = self.client.clone();
301 let name = new_name.to_string();
302 cx.spawn(|this, mut cx| async move {
303 let channel = client
304 .request(proto::RenameChannel { channel_id, name })
305 .await?
306 .channel
307 .ok_or_else(|| anyhow!("missing channel in response"))?;
308 this.update(&mut cx, |this, cx| {
309 let task = this.update_channels(
310 proto::UpdateChannels {
311 channels: vec![channel],
312 ..Default::default()
313 },
314 cx,
315 );
316 assert!(task.is_none());
317
318 // This event is emitted because the collab panel wants to clear the pending edit state
319 // before this frame is rendered. But we can't guarantee that the collab panel's future
320 // will resolve before this flush_effects finishes. Synchronously emitting this event
321 // ensures that the collab panel will observe this creation before the frame complete
322 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
323 });
324 Ok(())
325 })
326 }
327
328 pub fn respond_to_channel_invite(
329 &mut self,
330 channel_id: ChannelId,
331 accept: bool,
332 ) -> impl Future<Output = Result<()>> {
333 let client = self.client.clone();
334 async move {
335 client
336 .request(proto::RespondToChannelInvite { channel_id, accept })
337 .await?;
338 Ok(())
339 }
340 }
341
342 pub fn get_channel_member_details(
343 &self,
344 channel_id: ChannelId,
345 cx: &mut ModelContext<Self>,
346 ) -> Task<Result<Vec<ChannelMembership>>> {
347 let client = self.client.clone();
348 let user_store = self.user_store.downgrade();
349 cx.spawn(|_, mut cx| async move {
350 let response = client
351 .request(proto::GetChannelMembers { channel_id })
352 .await?;
353
354 let user_ids = response.members.iter().map(|m| m.user_id).collect();
355 let user_store = user_store
356 .upgrade(&cx)
357 .ok_or_else(|| anyhow!("user store dropped"))?;
358 let users = user_store
359 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
360 .await?;
361
362 Ok(users
363 .into_iter()
364 .zip(response.members)
365 .filter_map(|(user, member)| {
366 Some(ChannelMembership {
367 user,
368 admin: member.admin,
369 kind: proto::channel_member::Kind::from_i32(member.kind)?,
370 })
371 })
372 .collect())
373 })
374 }
375
376 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
377 let client = self.client.clone();
378 async move {
379 client.request(proto::RemoveChannel { channel_id }).await?;
380 Ok(())
381 }
382 }
383
384 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
385 false
386 }
387
388 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
389 self.outgoing_invites.contains(&(channel_id, user_id))
390 }
391
392 async fn handle_update_channels(
393 this: ModelHandle<Self>,
394 message: TypedEnvelope<proto::UpdateChannels>,
395 _: Arc<Client>,
396 mut cx: AsyncAppContext,
397 ) -> Result<()> {
398 this.update(&mut cx, |this, _| {
399 this.update_channels_tx
400 .unbounded_send(message.payload)
401 .unwrap();
402 });
403 Ok(())
404 }
405
406 pub(crate) fn update_channels(
407 &mut self,
408 payload: proto::UpdateChannels,
409 cx: &mut ModelContext<ChannelStore>,
410 ) -> Option<Task<Result<()>>> {
411 if !payload.remove_channel_invitations.is_empty() {
412 self.channel_invitations
413 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
414 }
415 for channel in payload.channel_invitations {
416 match self
417 .channel_invitations
418 .binary_search_by_key(&channel.id, |c| c.id)
419 {
420 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
421 Err(ix) => self.channel_invitations.insert(
422 ix,
423 Arc::new(Channel {
424 id: channel.id,
425 name: channel.name,
426 }),
427 ),
428 }
429 }
430
431 let channels_changed = !payload.channels.is_empty() || !payload.remove_channels.is_empty();
432 if channels_changed {
433 if !payload.remove_channels.is_empty() {
434 self.channels_by_id
435 .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
436 self.channel_participants
437 .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
438 self.channels_with_admin_privileges
439 .retain(|channel_id| !payload.remove_channels.contains(channel_id));
440 }
441
442 for channel in payload.channels {
443 if let Some(existing_channel) = self.channels_by_id.get_mut(&channel.id) {
444 // FIXME: We may be missing a path for this existing channel in certain cases
445 let existing_channel = Arc::make_mut(existing_channel);
446 existing_channel.name = channel.name;
447 continue;
448 }
449
450 self.channels_by_id.insert(
451 channel.id,
452 Arc::new(Channel {
453 id: channel.id,
454 name: channel.name,
455 }),
456 );
457
458 if let Some(parent_id) = channel.parent_id {
459 let mut ix = 0;
460 while ix < self.channel_paths.len() {
461 let path = &self.channel_paths[ix];
462 if path.ends_with(&[parent_id]) {
463 let mut new_path = path.clone();
464 new_path.push(channel.id);
465 self.channel_paths.insert(ix + 1, new_path);
466 ix += 1;
467 }
468 ix += 1;
469 }
470 } else {
471 self.channel_paths.push(vec![channel.id]);
472 }
473 }
474
475 self.channel_paths.sort_by(|a, b| {
476 let a = Self::channel_path_sorting_key(a, &self.channels_by_id);
477 let b = Self::channel_path_sorting_key(b, &self.channels_by_id);
478 a.cmp(b)
479 });
480 self.channel_paths.dedup();
481 self.channel_paths.retain(|path| {
482 path.iter()
483 .all(|channel_id| self.channels_by_id.contains_key(channel_id))
484 });
485 }
486
487 for permission in payload.channel_permissions {
488 if permission.is_admin {
489 self.channels_with_admin_privileges
490 .insert(permission.channel_id);
491 } else {
492 self.channels_with_admin_privileges
493 .remove(&permission.channel_id);
494 }
495 }
496
497 cx.notify();
498 if payload.channel_participants.is_empty() {
499 return None;
500 }
501
502 let mut all_user_ids = Vec::new();
503 let channel_participants = payload.channel_participants;
504 for entry in &channel_participants {
505 for user_id in entry.participant_user_ids.iter() {
506 if let Err(ix) = all_user_ids.binary_search(user_id) {
507 all_user_ids.insert(ix, *user_id);
508 }
509 }
510 }
511
512 let users = self
513 .user_store
514 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
515 Some(cx.spawn(|this, mut cx| async move {
516 let users = users.await?;
517
518 this.update(&mut cx, |this, cx| {
519 for entry in &channel_participants {
520 let mut participants: Vec<_> = entry
521 .participant_user_ids
522 .iter()
523 .filter_map(|user_id| {
524 users
525 .binary_search_by_key(&user_id, |user| &user.id)
526 .ok()
527 .map(|ix| users[ix].clone())
528 })
529 .collect();
530
531 participants.sort_by_key(|u| u.id);
532
533 this.channel_participants
534 .insert(entry.channel_id, participants);
535 }
536
537 cx.notify();
538 });
539 anyhow::Ok(())
540 }))
541 }
542
543 fn channel_path_sorting_key<'a>(
544 path: &'a [ChannelId],
545 channels_by_id: &'a HashMap<ChannelId, Arc<Channel>>,
546 ) -> impl 'a + Iterator<Item = Option<&'a str>> {
547 path.iter()
548 .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
549 }
550}