channel_store.rs

  1use crate::Status;
  2use crate::{Client, Subscription, User, UserStore};
  3use anyhow::anyhow;
  4use anyhow::Result;
  5use collections::HashMap;
  6use collections::HashSet;
  7use futures::channel::mpsc;
  8use futures::Future;
  9use futures::StreamExt;
 10use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
 11use rpc::{proto, TypedEnvelope};
 12use std::sync::Arc;
 13use util::ResultExt;
 14
 15pub type ChannelId = u64;
 16pub type UserId = u64;
 17
 18pub struct ChannelStore {
 19    channels_by_id: HashMap<ChannelId, Arc<Channel>>,
 20    channel_paths: Vec<Vec<ChannelId>>,
 21    channel_invitations: Vec<Arc<Channel>>,
 22    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
 23    channels_with_admin_privileges: HashSet<ChannelId>,
 24    outgoing_invites: HashSet<(ChannelId, UserId)>,
 25    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
 26    client: Arc<Client>,
 27    user_store: ModelHandle<UserStore>,
 28    _rpc_subscription: Subscription,
 29    _watch_connection_status: Task<()>,
 30    _update_channels: Task<()>,
 31}
 32
 33#[derive(Clone, Debug, PartialEq)]
 34pub struct Channel {
 35    pub id: ChannelId,
 36    pub name: String,
 37}
 38
 39pub struct ChannelMembership {
 40    pub user: Arc<User>,
 41    pub kind: proto::channel_member::Kind,
 42    pub admin: bool,
 43}
 44
 45pub enum ChannelEvent {
 46    ChannelCreated(ChannelId),
 47    ChannelRenamed(ChannelId),
 48}
 49
 50impl Entity for ChannelStore {
 51    type Event = ChannelEvent;
 52}
 53
 54pub enum ChannelMemberStatus {
 55    Invited,
 56    Member,
 57    NotMember,
 58}
 59
 60impl ChannelStore {
 61    pub fn new(
 62        client: Arc<Client>,
 63        user_store: ModelHandle<UserStore>,
 64        cx: &mut ModelContext<Self>,
 65    ) -> Self {
 66        let rpc_subscription =
 67            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 68
 69        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 70        let mut connection_status = client.status();
 71        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 72            while let Some(status) = connection_status.next().await {
 73                if matches!(status, Status::ConnectionLost | Status::SignedOut) {
 74                    if let Some(this) = this.upgrade(&cx) {
 75                        this.update(&mut cx, |this, cx| {
 76                            this.channels_by_id.clear();
 77                            this.channel_invitations.clear();
 78                            this.channel_participants.clear();
 79                            this.channels_with_admin_privileges.clear();
 80                            this.channel_paths.clear();
 81                            this.outgoing_invites.clear();
 82                            cx.notify();
 83                        });
 84                    } else {
 85                        break;
 86                    }
 87                }
 88            }
 89        });
 90        Self {
 91            channels_by_id: HashMap::default(),
 92            channel_invitations: Vec::default(),
 93            channel_paths: Vec::default(),
 94            channel_participants: Default::default(),
 95            channels_with_admin_privileges: Default::default(),
 96            outgoing_invites: Default::default(),
 97            update_channels_tx,
 98            client,
 99            user_store,
100            _rpc_subscription: rpc_subscription,
101            _watch_connection_status: watch_connection_status,
102            _update_channels: cx.spawn_weak(|this, mut cx| async move {
103                while let Some(update_channels) = update_channels_rx.next().await {
104                    if let Some(this) = this.upgrade(&cx) {
105                        let update_task = this.update(&mut cx, |this, cx| {
106                            this.update_channels(update_channels, cx)
107                        });
108                        if let Some(update_task) = update_task {
109                            update_task.await.log_err();
110                        }
111                    }
112                }
113            }),
114        }
115    }
116
117    pub fn channel_count(&self) -> usize {
118        self.channel_paths.len()
119    }
120
121    pub fn channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
122        self.channel_paths.iter().map(move |path| {
123            let id = path.last().unwrap();
124            let channel = self.channel_for_id(*id).unwrap();
125            (path.len() - 1, channel)
126        })
127    }
128
129    pub fn channel_at_index(&self, ix: usize) -> Option<(usize, &Arc<Channel>)> {
130        let path = self.channel_paths.get(ix)?;
131        let id = path.last().unwrap();
132        let channel = self.channel_for_id(*id).unwrap();
133        Some((path.len() - 1, channel))
134    }
135
136    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
137        &self.channel_invitations
138    }
139
140    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
141        self.channels_by_id.get(&channel_id)
142    }
143
144    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
145        self.channel_paths.iter().any(|path| {
146            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
147                path[..=ix]
148                    .iter()
149                    .any(|id| self.channels_with_admin_privileges.contains(id))
150            } else {
151                false
152            }
153        })
154    }
155
156    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
157        self.channel_participants
158            .get(&channel_id)
159            .map_or(&[], |v| v.as_slice())
160    }
161
162    pub fn create_channel(
163        &self,
164        name: &str,
165        parent_id: Option<ChannelId>,
166        cx: &mut ModelContext<Self>,
167    ) -> Task<Result<ChannelId>> {
168        let client = self.client.clone();
169        let name = name.trim_start_matches("#").to_owned();
170        cx.spawn(|this, mut cx| async move {
171            let channel = client
172                .request(proto::CreateChannel { name, parent_id })
173                .await?
174                .channel
175                .ok_or_else(|| anyhow!("missing channel in response"))?;
176
177            let channel_id = channel.id;
178
179            this.update(&mut cx, |this, cx| {
180                let task = this.update_channels(
181                    proto::UpdateChannels {
182                        channels: vec![channel],
183                        ..Default::default()
184                    },
185                    cx,
186                );
187                assert!(task.is_none());
188
189                // This event is emitted because the collab panel wants to clear the pending edit state
190                // before this frame is rendered. But we can't guarantee that the collab panel's future
191                // will resolve before this flush_effects finishes. Synchronously emitting this event
192                // ensures that the collab panel will observe this creation before the frame completes
193                cx.emit(ChannelEvent::ChannelCreated(channel_id));
194            });
195
196            Ok(channel_id)
197        })
198    }
199
200    pub fn invite_member(
201        &mut self,
202        channel_id: ChannelId,
203        user_id: UserId,
204        admin: bool,
205        cx: &mut ModelContext<Self>,
206    ) -> Task<Result<()>> {
207        if !self.outgoing_invites.insert((channel_id, user_id)) {
208            return Task::ready(Err(anyhow!("invite request already in progress")));
209        }
210
211        cx.notify();
212        let client = self.client.clone();
213        cx.spawn(|this, mut cx| async move {
214            let result = client
215                .request(proto::InviteChannelMember {
216                    channel_id,
217                    user_id,
218                    admin,
219                })
220                .await;
221
222            this.update(&mut cx, |this, cx| {
223                this.outgoing_invites.remove(&(channel_id, user_id));
224                cx.notify();
225            });
226
227            result?;
228
229            Ok(())
230        })
231    }
232
233    pub fn remove_member(
234        &mut self,
235        channel_id: ChannelId,
236        user_id: u64,
237        cx: &mut ModelContext<Self>,
238    ) -> Task<Result<()>> {
239        if !self.outgoing_invites.insert((channel_id, user_id)) {
240            return Task::ready(Err(anyhow!("invite request already in progress")));
241        }
242
243        cx.notify();
244        let client = self.client.clone();
245        cx.spawn(|this, mut cx| async move {
246            let result = client
247                .request(proto::RemoveChannelMember {
248                    channel_id,
249                    user_id,
250                })
251                .await;
252
253            this.update(&mut cx, |this, cx| {
254                this.outgoing_invites.remove(&(channel_id, user_id));
255                cx.notify();
256            });
257            result?;
258            Ok(())
259        })
260    }
261
262    pub fn set_member_admin(
263        &mut self,
264        channel_id: ChannelId,
265        user_id: UserId,
266        admin: bool,
267        cx: &mut ModelContext<Self>,
268    ) -> Task<Result<()>> {
269        if !self.outgoing_invites.insert((channel_id, user_id)) {
270            return Task::ready(Err(anyhow!("member request already in progress")));
271        }
272
273        cx.notify();
274        let client = self.client.clone();
275        cx.spawn(|this, mut cx| async move {
276            let result = client
277                .request(proto::SetChannelMemberAdmin {
278                    channel_id,
279                    user_id,
280                    admin,
281                })
282                .await;
283
284            this.update(&mut cx, |this, cx| {
285                this.outgoing_invites.remove(&(channel_id, user_id));
286                cx.notify();
287            });
288
289            result?;
290            Ok(())
291        })
292    }
293
294    pub fn rename(
295        &mut self,
296        channel_id: ChannelId,
297        new_name: &str,
298        cx: &mut ModelContext<Self>,
299    ) -> Task<Result<()>> {
300        let client = self.client.clone();
301        let name = new_name.to_string();
302        cx.spawn(|this, mut cx| async move {
303            let channel = client
304                .request(proto::RenameChannel { channel_id, name })
305                .await?
306                .channel
307                .ok_or_else(|| anyhow!("missing channel in response"))?;
308            this.update(&mut cx, |this, cx| {
309                let task = this.update_channels(
310                    proto::UpdateChannels {
311                        channels: vec![channel],
312                        ..Default::default()
313                    },
314                    cx,
315                );
316                assert!(task.is_none());
317
318                // This event is emitted because the collab panel wants to clear the pending edit state
319                // before this frame is rendered. But we can't guarantee that the collab panel's future
320                // will resolve before this flush_effects finishes. Synchronously emitting this event
321                // ensures that the collab panel will observe this creation before the frame complete
322                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
323            });
324            Ok(())
325        })
326    }
327
328    pub fn respond_to_channel_invite(
329        &mut self,
330        channel_id: ChannelId,
331        accept: bool,
332    ) -> impl Future<Output = Result<()>> {
333        let client = self.client.clone();
334        async move {
335            client
336                .request(proto::RespondToChannelInvite { channel_id, accept })
337                .await?;
338            Ok(())
339        }
340    }
341
342    pub fn get_channel_member_details(
343        &self,
344        channel_id: ChannelId,
345        cx: &mut ModelContext<Self>,
346    ) -> Task<Result<Vec<ChannelMembership>>> {
347        let client = self.client.clone();
348        let user_store = self.user_store.downgrade();
349        cx.spawn(|_, mut cx| async move {
350            let response = client
351                .request(proto::GetChannelMembers { channel_id })
352                .await?;
353
354            let user_ids = response.members.iter().map(|m| m.user_id).collect();
355            let user_store = user_store
356                .upgrade(&cx)
357                .ok_or_else(|| anyhow!("user store dropped"))?;
358            let users = user_store
359                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
360                .await?;
361
362            Ok(users
363                .into_iter()
364                .zip(response.members)
365                .filter_map(|(user, member)| {
366                    Some(ChannelMembership {
367                        user,
368                        admin: member.admin,
369                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
370                    })
371                })
372                .collect())
373        })
374    }
375
376    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
377        let client = self.client.clone();
378        async move {
379            client.request(proto::RemoveChannel { channel_id }).await?;
380            Ok(())
381        }
382    }
383
384    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
385        false
386    }
387
388    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
389        self.outgoing_invites.contains(&(channel_id, user_id))
390    }
391
392    async fn handle_update_channels(
393        this: ModelHandle<Self>,
394        message: TypedEnvelope<proto::UpdateChannels>,
395        _: Arc<Client>,
396        mut cx: AsyncAppContext,
397    ) -> Result<()> {
398        this.update(&mut cx, |this, _| {
399            this.update_channels_tx
400                .unbounded_send(message.payload)
401                .unwrap();
402        });
403        Ok(())
404    }
405
406    pub(crate) fn update_channels(
407        &mut self,
408        payload: proto::UpdateChannels,
409        cx: &mut ModelContext<ChannelStore>,
410    ) -> Option<Task<Result<()>>> {
411        if !payload.remove_channel_invitations.is_empty() {
412            self.channel_invitations
413                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
414        }
415        for channel in payload.channel_invitations {
416            match self
417                .channel_invitations
418                .binary_search_by_key(&channel.id, |c| c.id)
419            {
420                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
421                Err(ix) => self.channel_invitations.insert(
422                    ix,
423                    Arc::new(Channel {
424                        id: channel.id,
425                        name: channel.name,
426                    }),
427                ),
428            }
429        }
430
431        let channels_changed = !payload.channels.is_empty() || !payload.remove_channels.is_empty();
432        if channels_changed {
433            if !payload.remove_channels.is_empty() {
434                self.channels_by_id
435                    .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
436                self.channel_participants
437                    .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
438                self.channels_with_admin_privileges
439                    .retain(|channel_id| !payload.remove_channels.contains(channel_id));
440            }
441
442            for channel in payload.channels {
443                if let Some(existing_channel) = self.channels_by_id.get_mut(&channel.id) {
444                    // FIXME: We may be missing a path for this existing channel in certain cases
445                    let existing_channel = Arc::make_mut(existing_channel);
446                    existing_channel.name = channel.name;
447                    continue;
448                }
449
450                self.channels_by_id.insert(
451                    channel.id,
452                    Arc::new(Channel {
453                        id: channel.id,
454                        name: channel.name,
455                    }),
456                );
457
458                if let Some(parent_id) = channel.parent_id {
459                    let mut ix = 0;
460                    while ix < self.channel_paths.len() {
461                        let path = &self.channel_paths[ix];
462                        if path.ends_with(&[parent_id]) {
463                            let mut new_path = path.clone();
464                            new_path.push(channel.id);
465                            self.channel_paths.insert(ix + 1, new_path);
466                            ix += 1;
467                        }
468                        ix += 1;
469                    }
470                } else {
471                    self.channel_paths.push(vec![channel.id]);
472                }
473            }
474
475            self.channel_paths.sort_by(|a, b| {
476                let a = Self::channel_path_sorting_key(a, &self.channels_by_id);
477                let b = Self::channel_path_sorting_key(b, &self.channels_by_id);
478                a.cmp(b)
479            });
480            self.channel_paths.dedup();
481            self.channel_paths.retain(|path| {
482                path.iter()
483                    .all(|channel_id| self.channels_by_id.contains_key(channel_id))
484            });
485        }
486
487        for permission in payload.channel_permissions {
488            if permission.is_admin {
489                self.channels_with_admin_privileges
490                    .insert(permission.channel_id);
491            } else {
492                self.channels_with_admin_privileges
493                    .remove(&permission.channel_id);
494            }
495        }
496
497        cx.notify();
498        if payload.channel_participants.is_empty() {
499            return None;
500        }
501
502        let mut all_user_ids = Vec::new();
503        let channel_participants = payload.channel_participants;
504        for entry in &channel_participants {
505            for user_id in entry.participant_user_ids.iter() {
506                if let Err(ix) = all_user_ids.binary_search(user_id) {
507                    all_user_ids.insert(ix, *user_id);
508                }
509            }
510        }
511
512        let users = self
513            .user_store
514            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
515        Some(cx.spawn(|this, mut cx| async move {
516            let users = users.await?;
517
518            this.update(&mut cx, |this, cx| {
519                for entry in &channel_participants {
520                    let mut participants: Vec<_> = entry
521                        .participant_user_ids
522                        .iter()
523                        .filter_map(|user_id| {
524                            users
525                                .binary_search_by_key(&user_id, |user| &user.id)
526                                .ok()
527                                .map(|ix| users[ix].clone())
528                        })
529                        .collect();
530
531                    participants.sort_by_key(|u| u.id);
532
533                    this.channel_participants
534                        .insert(entry.channel_id, participants);
535                }
536
537                cx.notify();
538            });
539            anyhow::Ok(())
540        }))
541    }
542
543    fn channel_path_sorting_key<'a>(
544        path: &'a [ChannelId],
545        channels_by_id: &'a HashMap<ChannelId, Arc<Channel>>,
546    ) -> impl 'a + Iterator<Item = Option<&'a str>> {
547        path.iter()
548            .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
549    }
550}