channel_store.rs

  1use crate::channel_buffer::ChannelBuffer;
  2use anyhow::{anyhow, Result};
  3use client::{Client, Status, Subscription, User, UserId, UserStore};
  4use collections::{hash_map, HashMap, HashSet};
  5use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  6use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  7use rpc::{proto, TypedEnvelope};
  8use std::sync::Arc;
  9use util::ResultExt;
 10
 11pub type ChannelId = u64;
 12
 13pub struct ChannelStore {
 14    channels_by_id: HashMap<ChannelId, Arc<Channel>>,
 15    channel_paths: Vec<Vec<ChannelId>>,
 16    channel_invitations: Vec<Arc<Channel>>,
 17    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
 18    channels_with_admin_privileges: HashSet<ChannelId>,
 19    outgoing_invites: HashSet<(ChannelId, UserId)>,
 20    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
 21    opened_buffers: HashMap<ChannelId, OpenedChannelBuffer>,
 22    client: Arc<Client>,
 23    user_store: ModelHandle<UserStore>,
 24    _rpc_subscription: Subscription,
 25    _watch_connection_status: Task<()>,
 26    _update_channels: Task<()>,
 27}
 28
 29#[derive(Clone, Debug, PartialEq)]
 30pub struct Channel {
 31    pub id: ChannelId,
 32    pub name: String,
 33}
 34
 35pub struct ChannelMembership {
 36    pub user: Arc<User>,
 37    pub kind: proto::channel_member::Kind,
 38    pub admin: bool,
 39}
 40
 41pub enum ChannelEvent {
 42    ChannelCreated(ChannelId),
 43    ChannelRenamed(ChannelId),
 44}
 45
 46impl Entity for ChannelStore {
 47    type Event = ChannelEvent;
 48}
 49
 50pub enum ChannelMemberStatus {
 51    Invited,
 52    Member,
 53    NotMember,
 54}
 55
 56enum OpenedChannelBuffer {
 57    Open(WeakModelHandle<ChannelBuffer>),
 58    Loading(Shared<Task<Result<ModelHandle<ChannelBuffer>, Arc<anyhow::Error>>>>),
 59}
 60
 61impl ChannelStore {
 62    pub fn new(
 63        client: Arc<Client>,
 64        user_store: ModelHandle<UserStore>,
 65        cx: &mut ModelContext<Self>,
 66    ) -> Self {
 67        let rpc_subscription =
 68            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 69
 70        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 71        let mut connection_status = client.status();
 72        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 73            while let Some(status) = connection_status.next().await {
 74                if !status.is_connected() {
 75                    if let Some(this) = this.upgrade(&cx) {
 76                        this.update(&mut cx, |this, cx| {
 77                            if matches!(status, Status::ConnectionLost | Status::SignedOut) {
 78                                this.handle_disconnect(cx);
 79                            } else {
 80                                this.disconnect_buffers(cx);
 81                            }
 82                        });
 83                    } else {
 84                        break;
 85                    }
 86                }
 87            }
 88        });
 89
 90        Self {
 91            channels_by_id: HashMap::default(),
 92            channel_invitations: Vec::default(),
 93            channel_paths: Vec::default(),
 94            channel_participants: Default::default(),
 95            channels_with_admin_privileges: Default::default(),
 96            outgoing_invites: Default::default(),
 97            opened_buffers: Default::default(),
 98            update_channels_tx,
 99            client,
100            user_store,
101            _rpc_subscription: rpc_subscription,
102            _watch_connection_status: watch_connection_status,
103            _update_channels: cx.spawn_weak(|this, mut cx| async move {
104                while let Some(update_channels) = update_channels_rx.next().await {
105                    if let Some(this) = this.upgrade(&cx) {
106                        let update_task = this.update(&mut cx, |this, cx| {
107                            this.update_channels(update_channels, cx)
108                        });
109                        if let Some(update_task) = update_task {
110                            update_task.await.log_err();
111                        }
112                    }
113                }
114            }),
115        }
116    }
117
118    pub fn has_children(&self, channel_id: ChannelId) -> bool {
119        self.channel_paths.iter().any(|path| {
120            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
121                path.len() > ix + 1
122            } else {
123                false
124            }
125        })
126    }
127
128    pub fn channel_count(&self) -> usize {
129        self.channel_paths.len()
130    }
131
132    pub fn channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
133        self.channel_paths.iter().map(move |path| {
134            let id = path.last().unwrap();
135            let channel = self.channel_for_id(*id).unwrap();
136            (path.len() - 1, channel)
137        })
138    }
139
140    pub fn channel_at_index(&self, ix: usize) -> Option<(usize, &Arc<Channel>)> {
141        let path = self.channel_paths.get(ix)?;
142        let id = path.last().unwrap();
143        let channel = self.channel_for_id(*id).unwrap();
144        Some((path.len() - 1, channel))
145    }
146
147    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
148        &self.channel_invitations
149    }
150
151    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
152        self.channels_by_id.get(&channel_id)
153    }
154
155    pub fn open_channel_buffer(
156        &mut self,
157        channel_id: ChannelId,
158        cx: &mut ModelContext<Self>,
159    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
160        // Make sure that a given channel buffer is only opened once per
161        // app instance, even if this method is called multiple times
162        // with the same channel id while the first task is still running.
163        let task = loop {
164            match self.opened_buffers.entry(channel_id) {
165                hash_map::Entry::Occupied(e) => match e.get() {
166                    OpenedChannelBuffer::Open(buffer) => {
167                        if let Some(buffer) = buffer.upgrade(cx) {
168                            break Task::ready(Ok(buffer)).shared();
169                        } else {
170                            self.opened_buffers.remove(&channel_id);
171                            continue;
172                        }
173                    }
174                    OpenedChannelBuffer::Loading(task) => break task.clone(),
175                },
176                hash_map::Entry::Vacant(e) => {
177                    let client = self.client.clone();
178                    let task = cx
179                        .spawn(|this, cx| async move {
180                            let channel = this.read_with(&cx, |this, _| {
181                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
182                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
183                                })
184                            })?;
185
186                            ChannelBuffer::new(channel, client, cx)
187                                .await
188                                .map_err(Arc::new)
189                        })
190                        .shared();
191                    e.insert(OpenedChannelBuffer::Loading(task.clone()));
192                    cx.spawn({
193                        let task = task.clone();
194                        |this, mut cx| async move {
195                            let result = task.await;
196                            this.update(&mut cx, |this, cx| match result {
197                                Ok(buffer) => {
198                                    cx.observe_release(&buffer, move |this, _, _| {
199                                        this.opened_buffers.remove(&channel_id);
200                                    })
201                                    .detach();
202                                    this.opened_buffers.insert(
203                                        channel_id,
204                                        OpenedChannelBuffer::Open(buffer.downgrade()),
205                                    );
206                                }
207                                Err(error) => {
208                                    log::error!("failed to open channel buffer {error:?}");
209                                    this.opened_buffers.remove(&channel_id);
210                                }
211                            });
212                        }
213                    })
214                    .detach();
215                    break task;
216                }
217            }
218        };
219        cx.foreground()
220            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
221    }
222
223    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
224        self.channel_paths.iter().any(|path| {
225            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
226                path[..=ix]
227                    .iter()
228                    .any(|id| self.channels_with_admin_privileges.contains(id))
229            } else {
230                false
231            }
232        })
233    }
234
235    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
236        self.channel_participants
237            .get(&channel_id)
238            .map_or(&[], |v| v.as_slice())
239    }
240
241    pub fn create_channel(
242        &self,
243        name: &str,
244        parent_id: Option<ChannelId>,
245        cx: &mut ModelContext<Self>,
246    ) -> Task<Result<ChannelId>> {
247        let client = self.client.clone();
248        let name = name.trim_start_matches("#").to_owned();
249        cx.spawn(|this, mut cx| async move {
250            let channel = client
251                .request(proto::CreateChannel { name, parent_id })
252                .await?
253                .channel
254                .ok_or_else(|| anyhow!("missing channel in response"))?;
255
256            let channel_id = channel.id;
257
258            this.update(&mut cx, |this, cx| {
259                let task = this.update_channels(
260                    proto::UpdateChannels {
261                        channels: vec![channel],
262                        ..Default::default()
263                    },
264                    cx,
265                );
266                assert!(task.is_none());
267
268                // This event is emitted because the collab panel wants to clear the pending edit state
269                // before this frame is rendered. But we can't guarantee that the collab panel's future
270                // will resolve before this flush_effects finishes. Synchronously emitting this event
271                // ensures that the collab panel will observe this creation before the frame completes
272                cx.emit(ChannelEvent::ChannelCreated(channel_id));
273            });
274
275            Ok(channel_id)
276        })
277    }
278
279    pub fn invite_member(
280        &mut self,
281        channel_id: ChannelId,
282        user_id: UserId,
283        admin: bool,
284        cx: &mut ModelContext<Self>,
285    ) -> Task<Result<()>> {
286        if !self.outgoing_invites.insert((channel_id, user_id)) {
287            return Task::ready(Err(anyhow!("invite request already in progress")));
288        }
289
290        cx.notify();
291        let client = self.client.clone();
292        cx.spawn(|this, mut cx| async move {
293            let result = client
294                .request(proto::InviteChannelMember {
295                    channel_id,
296                    user_id,
297                    admin,
298                })
299                .await;
300
301            this.update(&mut cx, |this, cx| {
302                this.outgoing_invites.remove(&(channel_id, user_id));
303                cx.notify();
304            });
305
306            result?;
307
308            Ok(())
309        })
310    }
311
312    pub fn remove_member(
313        &mut self,
314        channel_id: ChannelId,
315        user_id: u64,
316        cx: &mut ModelContext<Self>,
317    ) -> Task<Result<()>> {
318        if !self.outgoing_invites.insert((channel_id, user_id)) {
319            return Task::ready(Err(anyhow!("invite request already in progress")));
320        }
321
322        cx.notify();
323        let client = self.client.clone();
324        cx.spawn(|this, mut cx| async move {
325            let result = client
326                .request(proto::RemoveChannelMember {
327                    channel_id,
328                    user_id,
329                })
330                .await;
331
332            this.update(&mut cx, |this, cx| {
333                this.outgoing_invites.remove(&(channel_id, user_id));
334                cx.notify();
335            });
336            result?;
337            Ok(())
338        })
339    }
340
341    pub fn set_member_admin(
342        &mut self,
343        channel_id: ChannelId,
344        user_id: UserId,
345        admin: bool,
346        cx: &mut ModelContext<Self>,
347    ) -> Task<Result<()>> {
348        if !self.outgoing_invites.insert((channel_id, user_id)) {
349            return Task::ready(Err(anyhow!("member request already in progress")));
350        }
351
352        cx.notify();
353        let client = self.client.clone();
354        cx.spawn(|this, mut cx| async move {
355            let result = client
356                .request(proto::SetChannelMemberAdmin {
357                    channel_id,
358                    user_id,
359                    admin,
360                })
361                .await;
362
363            this.update(&mut cx, |this, cx| {
364                this.outgoing_invites.remove(&(channel_id, user_id));
365                cx.notify();
366            });
367
368            result?;
369            Ok(())
370        })
371    }
372
373    pub fn rename(
374        &mut self,
375        channel_id: ChannelId,
376        new_name: &str,
377        cx: &mut ModelContext<Self>,
378    ) -> Task<Result<()>> {
379        let client = self.client.clone();
380        let name = new_name.to_string();
381        cx.spawn(|this, mut cx| async move {
382            let channel = client
383                .request(proto::RenameChannel { channel_id, name })
384                .await?
385                .channel
386                .ok_or_else(|| anyhow!("missing channel in response"))?;
387            this.update(&mut cx, |this, cx| {
388                let task = this.update_channels(
389                    proto::UpdateChannels {
390                        channels: vec![channel],
391                        ..Default::default()
392                    },
393                    cx,
394                );
395                assert!(task.is_none());
396
397                // This event is emitted because the collab panel wants to clear the pending edit state
398                // before this frame is rendered. But we can't guarantee that the collab panel's future
399                // will resolve before this flush_effects finishes. Synchronously emitting this event
400                // ensures that the collab panel will observe this creation before the frame complete
401                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
402            });
403            Ok(())
404        })
405    }
406
407    pub fn respond_to_channel_invite(
408        &mut self,
409        channel_id: ChannelId,
410        accept: bool,
411    ) -> impl Future<Output = Result<()>> {
412        let client = self.client.clone();
413        async move {
414            client
415                .request(proto::RespondToChannelInvite { channel_id, accept })
416                .await?;
417            Ok(())
418        }
419    }
420
421    pub fn get_channel_member_details(
422        &self,
423        channel_id: ChannelId,
424        cx: &mut ModelContext<Self>,
425    ) -> Task<Result<Vec<ChannelMembership>>> {
426        let client = self.client.clone();
427        let user_store = self.user_store.downgrade();
428        cx.spawn(|_, mut cx| async move {
429            let response = client
430                .request(proto::GetChannelMembers { channel_id })
431                .await?;
432
433            let user_ids = response.members.iter().map(|m| m.user_id).collect();
434            let user_store = user_store
435                .upgrade(&cx)
436                .ok_or_else(|| anyhow!("user store dropped"))?;
437            let users = user_store
438                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
439                .await?;
440
441            Ok(users
442                .into_iter()
443                .zip(response.members)
444                .filter_map(|(user, member)| {
445                    Some(ChannelMembership {
446                        user,
447                        admin: member.admin,
448                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
449                    })
450                })
451                .collect())
452        })
453    }
454
455    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
456        let client = self.client.clone();
457        async move {
458            client.request(proto::RemoveChannel { channel_id }).await?;
459            Ok(())
460        }
461    }
462
463    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
464        false
465    }
466
467    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
468        self.outgoing_invites.contains(&(channel_id, user_id))
469    }
470
471    async fn handle_update_channels(
472        this: ModelHandle<Self>,
473        message: TypedEnvelope<proto::UpdateChannels>,
474        _: Arc<Client>,
475        mut cx: AsyncAppContext,
476    ) -> Result<()> {
477        this.update(&mut cx, |this, _| {
478            this.update_channels_tx
479                .unbounded_send(message.payload)
480                .unwrap();
481        });
482        Ok(())
483    }
484
485    fn handle_disconnect(&mut self, cx: &mut ModelContext<'_, ChannelStore>) {
486        self.disconnect_buffers(cx);
487        self.channels_by_id.clear();
488        self.channel_invitations.clear();
489        self.channel_participants.clear();
490        self.channels_with_admin_privileges.clear();
491        self.channel_paths.clear();
492        self.outgoing_invites.clear();
493        cx.notify();
494    }
495
496    fn disconnect_buffers(&mut self, cx: &mut ModelContext<ChannelStore>) {
497        for (_, buffer) in self.opened_buffers.drain() {
498            if let OpenedChannelBuffer::Open(buffer) = buffer {
499                if let Some(buffer) = buffer.upgrade(cx) {
500                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
501                }
502            }
503        }
504    }
505
506    pub(crate) fn update_channels(
507        &mut self,
508        payload: proto::UpdateChannels,
509        cx: &mut ModelContext<ChannelStore>,
510    ) -> Option<Task<Result<()>>> {
511        if !payload.remove_channel_invitations.is_empty() {
512            self.channel_invitations
513                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
514        }
515        for channel in payload.channel_invitations {
516            match self
517                .channel_invitations
518                .binary_search_by_key(&channel.id, |c| c.id)
519            {
520                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
521                Err(ix) => self.channel_invitations.insert(
522                    ix,
523                    Arc::new(Channel {
524                        id: channel.id,
525                        name: channel.name,
526                    }),
527                ),
528            }
529        }
530
531        let channels_changed = !payload.channels.is_empty() || !payload.remove_channels.is_empty();
532        if channels_changed {
533            if !payload.remove_channels.is_empty() {
534                self.channels_by_id
535                    .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
536                self.channel_participants
537                    .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
538                self.channels_with_admin_privileges
539                    .retain(|channel_id| !payload.remove_channels.contains(channel_id));
540
541                for channel_id in &payload.remove_channels {
542                    let channel_id = *channel_id;
543                    if let Some(OpenedChannelBuffer::Open(buffer)) =
544                        self.opened_buffers.remove(&channel_id)
545                    {
546                        if let Some(buffer) = buffer.upgrade(cx) {
547                            buffer.update(cx, ChannelBuffer::disconnect);
548                        }
549                    }
550                }
551            }
552
553            for channel_proto in payload.channels {
554                if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
555                    Arc::make_mut(existing_channel).name = channel_proto.name;
556                } else {
557                    let channel = Arc::new(Channel {
558                        id: channel_proto.id,
559                        name: channel_proto.name,
560                    });
561                    self.channels_by_id.insert(channel.id, channel.clone());
562
563                    if let Some(parent_id) = channel_proto.parent_id {
564                        let mut ix = 0;
565                        while ix < self.channel_paths.len() {
566                            let path = &self.channel_paths[ix];
567                            if path.ends_with(&[parent_id]) {
568                                let mut new_path = path.clone();
569                                new_path.push(channel.id);
570                                self.channel_paths.insert(ix + 1, new_path);
571                                ix += 1;
572                            }
573                            ix += 1;
574                        }
575                    } else {
576                        self.channel_paths.push(vec![channel.id]);
577                    }
578                }
579            }
580
581            self.channel_paths.sort_by(|a, b| {
582                let a = Self::channel_path_sorting_key(a, &self.channels_by_id);
583                let b = Self::channel_path_sorting_key(b, &self.channels_by_id);
584                a.cmp(b)
585            });
586            self.channel_paths.dedup();
587            self.channel_paths.retain(|path| {
588                path.iter()
589                    .all(|channel_id| self.channels_by_id.contains_key(channel_id))
590            });
591        }
592
593        for permission in payload.channel_permissions {
594            if permission.is_admin {
595                self.channels_with_admin_privileges
596                    .insert(permission.channel_id);
597            } else {
598                self.channels_with_admin_privileges
599                    .remove(&permission.channel_id);
600            }
601        }
602
603        cx.notify();
604        if payload.channel_participants.is_empty() {
605            return None;
606        }
607
608        let mut all_user_ids = Vec::new();
609        let channel_participants = payload.channel_participants;
610        for entry in &channel_participants {
611            for user_id in entry.participant_user_ids.iter() {
612                if let Err(ix) = all_user_ids.binary_search(user_id) {
613                    all_user_ids.insert(ix, *user_id);
614                }
615            }
616        }
617
618        let users = self
619            .user_store
620            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
621        Some(cx.spawn(|this, mut cx| async move {
622            let users = users.await?;
623
624            this.update(&mut cx, |this, cx| {
625                for entry in &channel_participants {
626                    let mut participants: Vec<_> = entry
627                        .participant_user_ids
628                        .iter()
629                        .filter_map(|user_id| {
630                            users
631                                .binary_search_by_key(&user_id, |user| &user.id)
632                                .ok()
633                                .map(|ix| users[ix].clone())
634                        })
635                        .collect();
636
637                    participants.sort_by_key(|u| u.id);
638
639                    this.channel_participants
640                        .insert(entry.channel_id, participants);
641                }
642
643                cx.notify();
644            });
645            anyhow::Ok(())
646        }))
647    }
648
649    fn channel_path_sorting_key<'a>(
650        path: &'a [ChannelId],
651        channels_by_id: &'a HashMap<ChannelId, Arc<Channel>>,
652    ) -> impl 'a + Iterator<Item = Option<&'a str>> {
653        path.iter()
654            .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
655    }
656}