channel_store.rs

  1use crate::Status;
  2use crate::{Client, Subscription, User, UserStore};
  3use anyhow::anyhow;
  4use anyhow::Result;
  5use collections::HashMap;
  6use collections::HashSet;
  7use futures::channel::mpsc;
  8use futures::Future;
  9use futures::StreamExt;
 10use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
 11use rpc::{proto, TypedEnvelope};
 12use std::sync::Arc;
 13use util::ResultExt;
 14
 15pub type ChannelId = u64;
 16pub type UserId = u64;
 17
 18pub struct ChannelStore {
 19    channels_by_id: HashMap<ChannelId, Arc<Channel>>,
 20    channel_paths: Vec<Vec<ChannelId>>,
 21    channel_invitations: Vec<Arc<Channel>>,
 22    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
 23    channels_with_admin_privileges: HashSet<ChannelId>,
 24    outgoing_invites: HashSet<(ChannelId, UserId)>,
 25    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
 26    client: Arc<Client>,
 27    user_store: ModelHandle<UserStore>,
 28    _rpc_subscription: Subscription,
 29    _watch_connection_status: Task<()>,
 30    _update_channels: Task<()>,
 31}
 32
 33#[derive(Clone, Debug, PartialEq)]
 34pub struct Channel {
 35    pub id: ChannelId,
 36    pub name: String,
 37}
 38
 39pub struct ChannelMembership {
 40    pub user: Arc<User>,
 41    pub kind: proto::channel_member::Kind,
 42    pub admin: bool,
 43}
 44
 45pub enum ChannelEvent {
 46    ChannelCreated(ChannelId),
 47    ChannelRenamed(ChannelId),
 48}
 49
 50impl Entity for ChannelStore {
 51    type Event = ChannelEvent;
 52}
 53
 54pub enum ChannelMemberStatus {
 55    Invited,
 56    Member,
 57    NotMember,
 58}
 59
 60impl ChannelStore {
 61    pub fn new(
 62        client: Arc<Client>,
 63        user_store: ModelHandle<UserStore>,
 64        cx: &mut ModelContext<Self>,
 65    ) -> Self {
 66        let rpc_subscription =
 67            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 68
 69        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 70        let mut connection_status = client.status();
 71        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 72            while let Some(status) = connection_status.next().await {
 73                if matches!(status, Status::ConnectionLost | Status::SignedOut) {
 74                    if let Some(this) = this.upgrade(&cx) {
 75                        this.update(&mut cx, |this, cx| {
 76                            this.channels_by_id.clear();
 77                            this.channel_invitations.clear();
 78                            this.channel_participants.clear();
 79                            this.channels_with_admin_privileges.clear();
 80                            this.channel_paths.clear();
 81                            this.outgoing_invites.clear();
 82                            cx.notify();
 83                        });
 84                    } else {
 85                        break;
 86                    }
 87                }
 88            }
 89        });
 90        Self {
 91            channels_by_id: HashMap::default(),
 92            channel_invitations: Vec::default(),
 93            channel_paths: Vec::default(),
 94            channel_participants: Default::default(),
 95            channels_with_admin_privileges: Default::default(),
 96            outgoing_invites: Default::default(),
 97            update_channels_tx,
 98            client,
 99            user_store,
100            _rpc_subscription: rpc_subscription,
101            _watch_connection_status: watch_connection_status,
102            _update_channels: cx.spawn_weak(|this, mut cx| async move {
103                while let Some(update_channels) = update_channels_rx.next().await {
104                    if let Some(this) = this.upgrade(&cx) {
105                        let update_task = this.update(&mut cx, |this, cx| {
106                            this.update_channels(update_channels, cx)
107                        });
108                        if let Some(update_task) = update_task {
109                            update_task.await.log_err();
110                        }
111                    }
112                }
113            }),
114        }
115    }
116
117    pub fn has_children(&self, channel_id: ChannelId) -> bool {
118        self.channel_paths.iter().any(|path| {
119            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
120                path.len() > ix + 1
121            } else {
122                false
123            }
124        })
125    }
126
127    pub fn channel_count(&self) -> usize {
128        self.channel_paths.len()
129    }
130
131    pub fn channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
132        self.channel_paths.iter().map(move |path| {
133            let id = path.last().unwrap();
134            let channel = self.channel_for_id(*id).unwrap();
135            (path.len() - 1, channel)
136        })
137    }
138
139    pub fn channel_at_index(&self, ix: usize) -> Option<(usize, &Arc<Channel>)> {
140        let path = self.channel_paths.get(ix)?;
141        let id = path.last().unwrap();
142        let channel = self.channel_for_id(*id).unwrap();
143        Some((path.len() - 1, channel))
144    }
145
146    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
147        &self.channel_invitations
148    }
149
150    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
151        self.channels_by_id.get(&channel_id)
152    }
153
154    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
155        self.channel_paths.iter().any(|path| {
156            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
157                path[..=ix]
158                    .iter()
159                    .any(|id| self.channels_with_admin_privileges.contains(id))
160            } else {
161                false
162            }
163        })
164    }
165
166    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
167        self.channel_participants
168            .get(&channel_id)
169            .map_or(&[], |v| v.as_slice())
170    }
171
172    pub fn create_channel(
173        &self,
174        name: &str,
175        parent_id: Option<ChannelId>,
176        cx: &mut ModelContext<Self>,
177    ) -> Task<Result<ChannelId>> {
178        let client = self.client.clone();
179        let name = name.trim_start_matches("#").to_owned();
180        cx.spawn(|this, mut cx| async move {
181            let channel = client
182                .request(proto::CreateChannel { name, parent_id })
183                .await?
184                .channel
185                .ok_or_else(|| anyhow!("missing channel in response"))?;
186
187            let channel_id = channel.id;
188
189            this.update(&mut cx, |this, cx| {
190                let task = this.update_channels(
191                    proto::UpdateChannels {
192                        channels: vec![channel],
193                        ..Default::default()
194                    },
195                    cx,
196                );
197                assert!(task.is_none());
198
199                // This event is emitted because the collab panel wants to clear the pending edit state
200                // before this frame is rendered. But we can't guarantee that the collab panel's future
201                // will resolve before this flush_effects finishes. Synchronously emitting this event
202                // ensures that the collab panel will observe this creation before the frame completes
203                cx.emit(ChannelEvent::ChannelCreated(channel_id));
204            });
205
206            Ok(channel_id)
207        })
208    }
209
210    pub fn invite_member(
211        &mut self,
212        channel_id: ChannelId,
213        user_id: UserId,
214        admin: bool,
215        cx: &mut ModelContext<Self>,
216    ) -> Task<Result<()>> {
217        if !self.outgoing_invites.insert((channel_id, user_id)) {
218            return Task::ready(Err(anyhow!("invite request already in progress")));
219        }
220
221        cx.notify();
222        let client = self.client.clone();
223        cx.spawn(|this, mut cx| async move {
224            let result = client
225                .request(proto::InviteChannelMember {
226                    channel_id,
227                    user_id,
228                    admin,
229                })
230                .await;
231
232            this.update(&mut cx, |this, cx| {
233                this.outgoing_invites.remove(&(channel_id, user_id));
234                cx.notify();
235            });
236
237            result?;
238
239            Ok(())
240        })
241    }
242
243    pub fn remove_member(
244        &mut self,
245        channel_id: ChannelId,
246        user_id: u64,
247        cx: &mut ModelContext<Self>,
248    ) -> Task<Result<()>> {
249        if !self.outgoing_invites.insert((channel_id, user_id)) {
250            return Task::ready(Err(anyhow!("invite request already in progress")));
251        }
252
253        cx.notify();
254        let client = self.client.clone();
255        cx.spawn(|this, mut cx| async move {
256            let result = client
257                .request(proto::RemoveChannelMember {
258                    channel_id,
259                    user_id,
260                })
261                .await;
262
263            this.update(&mut cx, |this, cx| {
264                this.outgoing_invites.remove(&(channel_id, user_id));
265                cx.notify();
266            });
267            result?;
268            Ok(())
269        })
270    }
271
272    pub fn set_member_admin(
273        &mut self,
274        channel_id: ChannelId,
275        user_id: UserId,
276        admin: bool,
277        cx: &mut ModelContext<Self>,
278    ) -> Task<Result<()>> {
279        if !self.outgoing_invites.insert((channel_id, user_id)) {
280            return Task::ready(Err(anyhow!("member request already in progress")));
281        }
282
283        cx.notify();
284        let client = self.client.clone();
285        cx.spawn(|this, mut cx| async move {
286            let result = client
287                .request(proto::SetChannelMemberAdmin {
288                    channel_id,
289                    user_id,
290                    admin,
291                })
292                .await;
293
294            this.update(&mut cx, |this, cx| {
295                this.outgoing_invites.remove(&(channel_id, user_id));
296                cx.notify();
297            });
298
299            result?;
300            Ok(())
301        })
302    }
303
304    pub fn rename(
305        &mut self,
306        channel_id: ChannelId,
307        new_name: &str,
308        cx: &mut ModelContext<Self>,
309    ) -> Task<Result<()>> {
310        let client = self.client.clone();
311        let name = new_name.to_string();
312        cx.spawn(|this, mut cx| async move {
313            let channel = client
314                .request(proto::RenameChannel { channel_id, name })
315                .await?
316                .channel
317                .ok_or_else(|| anyhow!("missing channel in response"))?;
318            this.update(&mut cx, |this, cx| {
319                let task = this.update_channels(
320                    proto::UpdateChannels {
321                        channels: vec![channel],
322                        ..Default::default()
323                    },
324                    cx,
325                );
326                assert!(task.is_none());
327
328                // This event is emitted because the collab panel wants to clear the pending edit state
329                // before this frame is rendered. But we can't guarantee that the collab panel's future
330                // will resolve before this flush_effects finishes. Synchronously emitting this event
331                // ensures that the collab panel will observe this creation before the frame complete
332                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
333            });
334            Ok(())
335        })
336    }
337
338    pub fn respond_to_channel_invite(
339        &mut self,
340        channel_id: ChannelId,
341        accept: bool,
342    ) -> impl Future<Output = Result<()>> {
343        let client = self.client.clone();
344        async move {
345            client
346                .request(proto::RespondToChannelInvite { channel_id, accept })
347                .await?;
348            Ok(())
349        }
350    }
351
352    pub fn get_channel_member_details(
353        &self,
354        channel_id: ChannelId,
355        cx: &mut ModelContext<Self>,
356    ) -> Task<Result<Vec<ChannelMembership>>> {
357        let client = self.client.clone();
358        let user_store = self.user_store.downgrade();
359        cx.spawn(|_, mut cx| async move {
360            let response = client
361                .request(proto::GetChannelMembers { channel_id })
362                .await?;
363
364            let user_ids = response.members.iter().map(|m| m.user_id).collect();
365            let user_store = user_store
366                .upgrade(&cx)
367                .ok_or_else(|| anyhow!("user store dropped"))?;
368            let users = user_store
369                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
370                .await?;
371
372            Ok(users
373                .into_iter()
374                .zip(response.members)
375                .filter_map(|(user, member)| {
376                    Some(ChannelMembership {
377                        user,
378                        admin: member.admin,
379                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
380                    })
381                })
382                .collect())
383        })
384    }
385
386    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
387        let client = self.client.clone();
388        async move {
389            client.request(proto::RemoveChannel { channel_id }).await?;
390            Ok(())
391        }
392    }
393
394    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
395        false
396    }
397
398    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
399        self.outgoing_invites.contains(&(channel_id, user_id))
400    }
401
402    async fn handle_update_channels(
403        this: ModelHandle<Self>,
404        message: TypedEnvelope<proto::UpdateChannels>,
405        _: Arc<Client>,
406        mut cx: AsyncAppContext,
407    ) -> Result<()> {
408        this.update(&mut cx, |this, _| {
409            this.update_channels_tx
410                .unbounded_send(message.payload)
411                .unwrap();
412        });
413        Ok(())
414    }
415
416    pub(crate) fn update_channels(
417        &mut self,
418        payload: proto::UpdateChannels,
419        cx: &mut ModelContext<ChannelStore>,
420    ) -> Option<Task<Result<()>>> {
421        if !payload.remove_channel_invitations.is_empty() {
422            self.channel_invitations
423                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
424        }
425        for channel in payload.channel_invitations {
426            match self
427                .channel_invitations
428                .binary_search_by_key(&channel.id, |c| c.id)
429            {
430                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
431                Err(ix) => self.channel_invitations.insert(
432                    ix,
433                    Arc::new(Channel {
434                        id: channel.id,
435                        name: channel.name,
436                    }),
437                ),
438            }
439        }
440
441        let channels_changed = !payload.channels.is_empty() || !payload.remove_channels.is_empty();
442        if channels_changed {
443            if !payload.remove_channels.is_empty() {
444                self.channels_by_id
445                    .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
446                self.channel_participants
447                    .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
448                self.channels_with_admin_privileges
449                    .retain(|channel_id| !payload.remove_channels.contains(channel_id));
450            }
451
452            for channel in payload.channels {
453                if let Some(existing_channel) = self.channels_by_id.get_mut(&channel.id) {
454                    // FIXME: We may be missing a path for this existing channel in certain cases
455                    let existing_channel = Arc::make_mut(existing_channel);
456                    existing_channel.name = channel.name;
457                    continue;
458                }
459
460                self.channels_by_id.insert(
461                    channel.id,
462                    Arc::new(Channel {
463                        id: channel.id,
464                        name: channel.name,
465                    }),
466                );
467
468                if let Some(parent_id) = channel.parent_id {
469                    let mut ix = 0;
470                    while ix < self.channel_paths.len() {
471                        let path = &self.channel_paths[ix];
472                        if path.ends_with(&[parent_id]) {
473                            let mut new_path = path.clone();
474                            new_path.push(channel.id);
475                            self.channel_paths.insert(ix + 1, new_path);
476                            ix += 1;
477                        }
478                        ix += 1;
479                    }
480                } else {
481                    self.channel_paths.push(vec![channel.id]);
482                }
483            }
484
485            self.channel_paths.sort_by(|a, b| {
486                let a = Self::channel_path_sorting_key(a, &self.channels_by_id);
487                let b = Self::channel_path_sorting_key(b, &self.channels_by_id);
488                a.cmp(b)
489            });
490            self.channel_paths.dedup();
491            self.channel_paths.retain(|path| {
492                path.iter()
493                    .all(|channel_id| self.channels_by_id.contains_key(channel_id))
494            });
495        }
496
497        for permission in payload.channel_permissions {
498            if permission.is_admin {
499                self.channels_with_admin_privileges
500                    .insert(permission.channel_id);
501            } else {
502                self.channels_with_admin_privileges
503                    .remove(&permission.channel_id);
504            }
505        }
506
507        cx.notify();
508        if payload.channel_participants.is_empty() {
509            return None;
510        }
511
512        let mut all_user_ids = Vec::new();
513        let channel_participants = payload.channel_participants;
514        for entry in &channel_participants {
515            for user_id in entry.participant_user_ids.iter() {
516                if let Err(ix) = all_user_ids.binary_search(user_id) {
517                    all_user_ids.insert(ix, *user_id);
518                }
519            }
520        }
521
522        let users = self
523            .user_store
524            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
525        Some(cx.spawn(|this, mut cx| async move {
526            let users = users.await?;
527
528            this.update(&mut cx, |this, cx| {
529                for entry in &channel_participants {
530                    let mut participants: Vec<_> = entry
531                        .participant_user_ids
532                        .iter()
533                        .filter_map(|user_id| {
534                            users
535                                .binary_search_by_key(&user_id, |user| &user.id)
536                                .ok()
537                                .map(|ix| users[ix].clone())
538                        })
539                        .collect();
540
541                    participants.sort_by_key(|u| u.id);
542
543                    this.channel_participants
544                        .insert(entry.channel_id, participants);
545                }
546
547                cx.notify();
548            });
549            anyhow::Ok(())
550        }))
551    }
552
553    fn channel_path_sorting_key<'a>(
554        path: &'a [ChannelId],
555        channels_by_id: &'a HashMap<ChannelId, Arc<Channel>>,
556    ) -> impl 'a + Iterator<Item = Option<&'a str>> {
557        path.iter()
558            .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
559    }
560}