channel_store.rs

  1use crate::Status;
  2use crate::{Client, Subscription, User, UserStore};
  3use anyhow::anyhow;
  4use anyhow::Result;
  5use collections::HashMap;
  6use collections::HashSet;
  7use futures::Future;
  8use futures::StreamExt;
  9use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
 10use rpc::{proto, TypedEnvelope};
 11use std::sync::Arc;
 12
 13pub type ChannelId = u64;
 14pub type UserId = u64;
 15
 16pub struct ChannelStore {
 17    channels: Vec<Arc<Channel>>,
 18    channel_invitations: Vec<Arc<Channel>>,
 19    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
 20    channels_with_admin_privileges: HashSet<ChannelId>,
 21    outgoing_invites: HashSet<(ChannelId, UserId)>,
 22    client: Arc<Client>,
 23    user_store: ModelHandle<UserStore>,
 24    _rpc_subscription: Subscription,
 25    _watch_connection_status: Task<()>,
 26}
 27
 28#[derive(Clone, Debug, PartialEq)]
 29pub struct Channel {
 30    pub id: ChannelId,
 31    pub name: String,
 32    pub parent_id: Option<ChannelId>,
 33    pub depth: usize,
 34}
 35
 36pub struct ChannelMembership {
 37    pub user: Arc<User>,
 38    pub kind: proto::channel_member::Kind,
 39    pub admin: bool,
 40}
 41
 42pub enum ChannelEvent {
 43    ChannelCreated(ChannelId),
 44    ChannelRenamed(ChannelId),
 45}
 46
 47impl Entity for ChannelStore {
 48    type Event = ChannelEvent;
 49}
 50
 51pub enum ChannelMemberStatus {
 52    Invited,
 53    Member,
 54    NotMember,
 55}
 56
 57impl ChannelStore {
 58    pub fn new(
 59        client: Arc<Client>,
 60        user_store: ModelHandle<UserStore>,
 61        cx: &mut ModelContext<Self>,
 62    ) -> Self {
 63        let rpc_subscription =
 64            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 65
 66        let mut connection_status = client.status();
 67        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 68            while let Some(status) = connection_status.next().await {
 69                if matches!(status, Status::ConnectionLost | Status::SignedOut) {
 70                    if let Some(this) = this.upgrade(&cx) {
 71                        this.update(&mut cx, |this, cx| {
 72                            this.channels.clear();
 73                            this.channel_invitations.clear();
 74                            this.channel_participants.clear();
 75                            this.channels_with_admin_privileges.clear();
 76                            this.outgoing_invites.clear();
 77                            cx.notify();
 78                        });
 79                    } else {
 80                        break;
 81                    }
 82                }
 83            }
 84        });
 85        Self {
 86            channels: vec![],
 87            channel_invitations: vec![],
 88            channel_participants: Default::default(),
 89            channels_with_admin_privileges: Default::default(),
 90            outgoing_invites: Default::default(),
 91            client,
 92            user_store,
 93            _rpc_subscription: rpc_subscription,
 94            _watch_connection_status: watch_connection_status,
 95        }
 96    }
 97
 98    pub fn channels(&self) -> &[Arc<Channel>] {
 99        &self.channels
100    }
101
102    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
103        &self.channel_invitations
104    }
105
106    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<Arc<Channel>> {
107        self.channels.iter().find(|c| c.id == channel_id).cloned()
108    }
109
110    pub fn is_user_admin(&self, mut channel_id: ChannelId) -> bool {
111        loop {
112            if self.channels_with_admin_privileges.contains(&channel_id) {
113                return true;
114            }
115            if let Some(channel) = self.channel_for_id(channel_id) {
116                if let Some(parent_id) = channel.parent_id {
117                    channel_id = parent_id;
118                    continue;
119                }
120            }
121            return false;
122        }
123    }
124
125    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
126        self.channel_participants
127            .get(&channel_id)
128            .map_or(&[], |v| v.as_slice())
129    }
130
131    pub fn create_channel(
132        &self,
133        name: &str,
134        parent_id: Option<ChannelId>,
135        cx: &mut ModelContext<Self>,
136    ) -> Task<Result<ChannelId>> {
137        let client = self.client.clone();
138        let name = name.trim_start_matches("#").to_owned();
139        cx.spawn(|this, mut cx| async move {
140            let channel = client
141                .request(proto::CreateChannel { name, parent_id })
142                .await?
143                .channel
144                .ok_or_else(|| anyhow!("missing channel in response"))?;
145
146            let channel_id = channel.id;
147
148            this.update(&mut cx, |this, cx| {
149                this.update_channels(
150                    proto::UpdateChannels {
151                        channels: vec![channel],
152                        ..Default::default()
153                    },
154                    cx,
155                );
156
157                // This event is emitted because the collab panel wants to clear the pending edit state
158                // before this frame is rendered. But we can't guarantee that the collab panel's future
159                // will resolve before this flush_effects finishes. Synchronously emitting this event
160                // ensures that the collab panel will observe this creation before the frame completes
161                cx.emit(ChannelEvent::ChannelCreated(channel_id));
162            });
163
164            Ok(channel_id)
165        })
166    }
167
168    pub fn invite_member(
169        &mut self,
170        channel_id: ChannelId,
171        user_id: UserId,
172        admin: bool,
173        cx: &mut ModelContext<Self>,
174    ) -> Task<Result<()>> {
175        if !self.outgoing_invites.insert((channel_id, user_id)) {
176            return Task::ready(Err(anyhow!("invite request already in progress")));
177        }
178
179        cx.notify();
180        let client = self.client.clone();
181        cx.spawn(|this, mut cx| async move {
182            let result = client
183                .request(proto::InviteChannelMember {
184                    channel_id,
185                    user_id,
186                    admin,
187                })
188                .await;
189
190            this.update(&mut cx, |this, cx| {
191                this.outgoing_invites.remove(&(channel_id, user_id));
192                cx.notify();
193            });
194
195            result?;
196
197            Ok(())
198        })
199    }
200
201    pub fn remove_member(
202        &mut self,
203        channel_id: ChannelId,
204        user_id: u64,
205        cx: &mut ModelContext<Self>,
206    ) -> Task<Result<()>> {
207        if !self.outgoing_invites.insert((channel_id, user_id)) {
208            return Task::ready(Err(anyhow!("invite request already in progress")));
209        }
210
211        cx.notify();
212        let client = self.client.clone();
213        cx.spawn(|this, mut cx| async move {
214            let result = client
215                .request(proto::RemoveChannelMember {
216                    channel_id,
217                    user_id,
218                })
219                .await;
220
221            this.update(&mut cx, |this, cx| {
222                this.outgoing_invites.remove(&(channel_id, user_id));
223                cx.notify();
224            });
225            result?;
226            Ok(())
227        })
228    }
229
230    pub fn set_member_admin(
231        &mut self,
232        channel_id: ChannelId,
233        user_id: UserId,
234        admin: bool,
235        cx: &mut ModelContext<Self>,
236    ) -> Task<Result<()>> {
237        if !self.outgoing_invites.insert((channel_id, user_id)) {
238            return Task::ready(Err(anyhow!("member request already in progress")));
239        }
240
241        cx.notify();
242        let client = self.client.clone();
243        cx.spawn(|this, mut cx| async move {
244            let result = client
245                .request(proto::SetChannelMemberAdmin {
246                    channel_id,
247                    user_id,
248                    admin,
249                })
250                .await;
251
252            this.update(&mut cx, |this, cx| {
253                this.outgoing_invites.remove(&(channel_id, user_id));
254                cx.notify();
255            });
256
257            result?;
258            Ok(())
259        })
260    }
261
262    pub fn rename(
263        &mut self,
264        channel_id: ChannelId,
265        new_name: &str,
266        cx: &mut ModelContext<Self>,
267    ) -> Task<Result<()>> {
268        let client = self.client.clone();
269        let name = new_name.to_string();
270        cx.spawn(|this, mut cx| async move {
271            let channel = client
272                .request(proto::RenameChannel { channel_id, name })
273                .await?
274                .channel
275                .ok_or_else(|| anyhow!("missing channel in response"))?;
276            this.update(&mut cx, |this, cx| {
277                this.update_channels(
278                    proto::UpdateChannels {
279                        channels: vec![channel],
280                        ..Default::default()
281                    },
282                    cx,
283                );
284
285                // This event is emitted because the collab panel wants to clear the pending edit state
286                // before this frame is rendered. But we can't guarantee that the collab panel's future
287                // will resolve before this flush_effects finishes. Synchronously emitting this event
288                // ensures that the collab panel will observe this creation before the frame complete
289                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
290            });
291            Ok(())
292        })
293    }
294
295    pub fn respond_to_channel_invite(
296        &mut self,
297        channel_id: ChannelId,
298        accept: bool,
299    ) -> impl Future<Output = Result<()>> {
300        let client = self.client.clone();
301        async move {
302            client
303                .request(proto::RespondToChannelInvite { channel_id, accept })
304                .await?;
305            Ok(())
306        }
307    }
308
309    pub fn get_channel_member_details(
310        &self,
311        channel_id: ChannelId,
312        cx: &mut ModelContext<Self>,
313    ) -> Task<Result<Vec<ChannelMembership>>> {
314        let client = self.client.clone();
315        let user_store = self.user_store.downgrade();
316        cx.spawn(|_, mut cx| async move {
317            let response = client
318                .request(proto::GetChannelMembers { channel_id })
319                .await?;
320
321            let user_ids = response.members.iter().map(|m| m.user_id).collect();
322            let user_store = user_store
323                .upgrade(&cx)
324                .ok_or_else(|| anyhow!("user store dropped"))?;
325            let users = user_store
326                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
327                .await?;
328
329            Ok(users
330                .into_iter()
331                .zip(response.members)
332                .filter_map(|(user, member)| {
333                    Some(ChannelMembership {
334                        user,
335                        admin: member.admin,
336                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
337                    })
338                })
339                .collect())
340        })
341    }
342
343    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
344        let client = self.client.clone();
345        async move {
346            client.request(proto::RemoveChannel { channel_id }).await?;
347            Ok(())
348        }
349    }
350
351    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
352        false
353    }
354
355    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
356        self.outgoing_invites.contains(&(channel_id, user_id))
357    }
358
359    async fn handle_update_channels(
360        this: ModelHandle<Self>,
361        message: TypedEnvelope<proto::UpdateChannels>,
362        _: Arc<Client>,
363        mut cx: AsyncAppContext,
364    ) -> Result<()> {
365        this.update(&mut cx, |this, cx| {
366            this.update_channels(message.payload, cx);
367        });
368        Ok(())
369    }
370
371    pub(crate) fn update_channels(
372        &mut self,
373        payload: proto::UpdateChannels,
374        cx: &mut ModelContext<ChannelStore>,
375    ) {
376        self.channels
377            .retain(|channel| !payload.remove_channels.contains(&channel.id));
378        self.channel_invitations
379            .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
380        self.channel_participants
381            .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
382        self.channels_with_admin_privileges
383            .retain(|channel_id| !payload.remove_channels.contains(channel_id));
384
385        for channel in payload.channel_invitations {
386            if let Some(existing_channel) = self
387                .channel_invitations
388                .iter_mut()
389                .find(|c| c.id == channel.id)
390            {
391                let existing_channel = Arc::make_mut(existing_channel);
392                existing_channel.name = channel.name;
393                continue;
394            }
395
396            self.channel_invitations.insert(
397                0,
398                Arc::new(Channel {
399                    id: channel.id,
400                    name: channel.name,
401                    parent_id: None,
402                    depth: 0,
403                }),
404            );
405        }
406
407        for channel in payload.channels {
408            if let Some(existing_channel) = self.channels.iter_mut().find(|c| c.id == channel.id) {
409                let existing_channel = Arc::make_mut(existing_channel);
410                existing_channel.name = channel.name;
411                continue;
412            }
413
414            if let Some(parent_id) = channel.parent_id {
415                if let Some(ix) = self.channels.iter().position(|c| c.id == parent_id) {
416                    let parent_channel = &self.channels[ix];
417                    let depth = parent_channel.depth + 1;
418                    self.channels.insert(
419                        ix + 1,
420                        Arc::new(Channel {
421                            id: channel.id,
422                            name: channel.name,
423                            parent_id: Some(parent_id),
424                            depth,
425                        }),
426                    );
427                }
428            } else {
429                self.channels.insert(
430                    0,
431                    Arc::new(Channel {
432                        id: channel.id,
433                        name: channel.name,
434                        parent_id: None,
435                        depth: 0,
436                    }),
437                );
438            }
439        }
440
441        for permission in payload.channel_permissions {
442            if permission.is_admin {
443                self.channels_with_admin_privileges
444                    .insert(permission.channel_id);
445            } else {
446                self.channels_with_admin_privileges
447                    .remove(&permission.channel_id);
448            }
449        }
450
451        let mut all_user_ids = Vec::new();
452        let channel_participants = payload.channel_participants;
453        for entry in &channel_participants {
454            for user_id in entry.participant_user_ids.iter() {
455                if let Err(ix) = all_user_ids.binary_search(user_id) {
456                    all_user_ids.insert(ix, *user_id);
457                }
458            }
459        }
460
461        // TODO: Race condition if an update channels messages comes in while resolving avatars
462        let users = self
463            .user_store
464            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
465        cx.spawn(|this, mut cx| async move {
466            let users = users.await?;
467
468            this.update(&mut cx, |this, cx| {
469                for entry in &channel_participants {
470                    let mut participants: Vec<_> = entry
471                        .participant_user_ids
472                        .iter()
473                        .filter_map(|user_id| {
474                            users
475                                .binary_search_by_key(&user_id, |user| &user.id)
476                                .ok()
477                                .map(|ix| users[ix].clone())
478                        })
479                        .collect();
480
481                    participants.sort_by_key(|u| u.id);
482
483                    this.channel_participants
484                        .insert(entry.channel_id, participants);
485                }
486
487                cx.notify();
488            });
489            anyhow::Ok(())
490        })
491        .detach();
492
493        cx.notify();
494    }
495}