Finish optimizing channel representations and operations

Mikayla created

Change summary

Cargo.lock                                        |   1 
crates/channel/Cargo.toml                         |   1 
crates/channel/src/channel_store.rs               |  11 -
crates/channel/src/channel_store/channel_index.rs | 108 ++++++----------
crates/channel/src/channel_store_tests.rs         |   1 
crates/collab/src/db.rs                           |   5 
crates/collab/src/db/queries/channels.rs          |  54 +++++---
crates/collab/src/db/tests/channel_tests.rs       |   2 
crates/collab/src/rpc.rs                          |  37 +++--
crates/collab_ui/src/collab_panel.rs              |  28 +++
10 files changed, 131 insertions(+), 117 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1222,6 +1222,7 @@ dependencies = [
  "serde",
  "serde_derive",
  "settings",
+ "smallvec",
  "smol",
  "sum_tree",
  "tempfile",

crates/channel/Cargo.toml 🔗

@@ -28,6 +28,7 @@ anyhow.workspace = true
 futures.workspace = true
 image = "0.23"
 lazy_static.workspace = true
+smallvec.workspace = true
 log.workspace = true
 parking_lot.workspace = true
 postage.workspace = true

crates/channel/src/channel_store.rs 🔗

@@ -805,21 +805,18 @@ impl ChannelStore {
                 }
             }
 
-            let mut index_edit = self.channel_index.bulk_edit();
-            dbg!(&index_edit);
+            let mut index = self.channel_index.bulk_insert();
             for channel in payload.channels {
-                index_edit.insert(channel)
+                index.insert(channel)
             }
 
             for edge in payload.insert_edge {
-                index_edit.insert_edge(edge.parent_id, edge.channel_id);
+                index.insert_edge(edge.channel_id, edge.parent_id);
             }
 
             for edge in payload.delete_edge {
-                index_edit.delete_edge(edge.parent_id, edge.channel_id);
+                index.delete_edge(edge.parent_id, edge.channel_id);
             }
-            drop(index_edit);
-            dbg!(&self.channel_index);
         }
 
         for permission in payload.channel_permissions {

crates/channel/src/channel_store/channel_index.rs 🔗

@@ -1,10 +1,9 @@
 use std::{ops::Deref, sync::Arc};
 
+use crate::{Channel, ChannelId};
 use collections::HashMap;
 use rpc::proto;
 
-use crate::{Channel, ChannelId};
-
 use super::ChannelPath;
 
 pub type ChannelsById = HashMap<ChannelId, Arc<Channel>>;
@@ -35,8 +34,8 @@ impl ChannelIndex {
         });
     }
 
-    pub fn bulk_edit(&mut self) -> ChannelPathsEditGuard {
-        ChannelPathsEditGuard {
+    pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
+        ChannelPathsInsertGuard {
             paths: &mut self.paths,
             channels_by_id: &mut self.channels_by_id,
         }
@@ -54,12 +53,12 @@ impl Deref for ChannelIndex {
 /// A guard for ensuring that the paths index maintains its sort and uniqueness
 /// invariants after a series of insertions
 #[derive(Debug)]
-pub struct ChannelPathsEditGuard<'a> {
+pub struct ChannelPathsInsertGuard<'a> {
     paths: &'a mut Vec<ChannelPath>,
     channels_by_id: &'a mut ChannelsById,
 }
 
-impl<'a> ChannelPathsEditGuard<'a> {
+impl<'a> ChannelPathsInsertGuard<'a> {
     /// Remove the given edge from this index. This will not remove the channel.
     /// If this operation would result in a dangling edge, re-insert it.
     pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) {
@@ -94,69 +93,50 @@ impl<'a> ChannelPathsEditGuard<'a> {
         }
     }
 
-    pub fn insert_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) {
-        debug_assert!(self.channels_by_id.contains_key(&parent_id));
-        let mut ix = 0;
-        println!("*********** INSERTING EDGE {}, {} ***********", channel_id, parent_id);
-        dbg!(&self.paths);
-        while ix < self.paths.len() {
-            let path = &self.paths[ix];
-            println!("*********");
-            dbg!(path);
+    pub fn insert_edge(&mut self, channel_id: ChannelId, parent_id: ChannelId) {
+        let mut parents = Vec::new();
+        let mut descendants = Vec::new();
+        let mut ixs_to_remove = Vec::new();
 
+        for (ix, path) in self.paths.iter().enumerate() {
+            if path
+                .windows(2)
+                .any(|window| window[0] == parent_id && window[1] == channel_id)
+            {
+                // We already have this edge in the index
+                return;
+            }
             if path.ends_with(&[parent_id]) {
-                dbg!("Appending to parent path");
-                let mut new_path = Vec::with_capacity(path.len() + 1);
-                new_path.extend_from_slice(path);
-                new_path.push(channel_id);
-                self.paths.insert(ix + 1, dbg!(ChannelPath::new(new_path.into())));
-                ix += 2;
-            } else if let Some(path_ix) = path.iter().position(|c| c == &channel_id) {
-                if path.contains(&parent_id) {
-                    dbg!("Doing nothing");
-                    ix += 1;
-                    continue;
-                }
-                if path_ix == 0 && path.len() == 1 {
-                    dbg!("Removing path that is just this");
-                    self.paths.swap_remove(ix);
-                    continue;
-                }
-                // This is the busted section rn
-                // We're trying to do this weird, unsorted context
-                // free insertion thing, but we can't insert 'parent_id',
-                // we have to _prepend_ with _parent path to_,
-                // or something like that.
-                // It's a bit busted rn, I think I need to keep this whole thing
-                // sorted now, as this is a huge mess.
-                // Basically, we want to do the exact thing we do in the
-                // server, except explicitly.
-                // Also, rethink the bulk edit abstraction, it's use may no longer
-                // be as needed with the channel names and edges seperated.
-                dbg!("Expanding path which contains");
-                let (left, right) = path.split_at(path_ix);
-                let mut new_path = Vec::with_capacity(left.len() + right.len() + 1);
-
-                /// WRONG WRONG WRONG
-                new_path.extend_from_slice(left);
-                new_path.push(parent_id);
-                /// WRONG WRONG WRONG
-
-                new_path.extend_from_slice(right);
-                if path_ix == 0 {
-                    dbg!("Replacing path that starts with this");
-                    self.paths[ix] = dbg!(ChannelPath::new(new_path.into()));
-                } else {
-                    dbg!("inserting new path");
-                    self.paths.insert(ix + 1, dbg!(ChannelPath::new(new_path.into())));
-                    ix += 1;
+                parents.push(path);
+            } else if let Some(position) = path.iter().position(|id| id == &channel_id) {
+                if position == 0 {
+                    ixs_to_remove.push(ix);
                 }
-                ix += 1;
+                descendants.push(path.split_at(position).1);
+            }
+        }
+
+        let mut new_paths = Vec::new();
+        for parent in parents.iter() {
+            if descendants.is_empty() {
+                let mut new_path = Vec::with_capacity(parent.len() + 1);
+                new_path.extend_from_slice(parent);
+                new_path.push(channel_id);
+                new_paths.push(ChannelPath(new_path.into()));
             } else {
-                dbg!("Doing nothing");
-                ix += 1;
+                for descendant in descendants.iter() {
+                    let mut new_path = Vec::with_capacity(parent.len() + descendant.len());
+                    new_path.extend_from_slice(parent);
+                    new_path.extend_from_slice(descendant);
+                    new_paths.push(ChannelPath(new_path.into()));
+                }
             }
         }
+
+        for ix in ixs_to_remove.into_iter().rev() {
+            self.paths.swap_remove(ix);
+        }
+        self.paths.extend(new_paths)
     }
 
     fn insert_root(&mut self, channel_id: ChannelId) {
@@ -164,7 +144,7 @@ impl<'a> ChannelPathsEditGuard<'a> {
     }
 }
 
-impl<'a> Drop for ChannelPathsEditGuard<'a> {
+impl<'a> Drop for ChannelPathsInsertGuard<'a> {
     fn drop(&mut self) {
         self.paths.sort_by(|a, b| {
             let a = channel_path_sorting_key(a, &self.channels_by_id);

crates/collab/src/db.rs 🔗

@@ -14,7 +14,10 @@ use collections::{BTreeMap, HashMap, HashSet};
 use dashmap::DashMap;
 use futures::StreamExt;
 use rand::{prelude::StdRng, Rng, SeedableRng};
-use rpc::{proto::{self}, ConnectionId};
+use rpc::{
+    proto::{self},
+    ConnectionId,
+};
 use sea_orm::{
     entity::prelude::*, ActiveValue, Condition, ConnectionTrait, DatabaseConnection,
     DatabaseTransaction, DbErr, FromQueryResult, IntoActiveModel, IsolationLevel, JoinType,

crates/collab/src/db/queries/channels.rs 🔗

@@ -375,10 +375,7 @@ impl Database {
             }
         }
 
-        Ok(ChannelGraph {
-            channels,
-            edges,
-        })
+        Ok(ChannelGraph { channels, edges })
     }
 
     pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
@@ -394,12 +391,16 @@ impl Database {
                 .all(&*tx)
                 .await?;
 
-            self.get_user_channels(channel_memberships, user_id, &tx).await
+            self.get_user_channels(channel_memberships, &tx).await
         })
         .await
     }
 
-    pub async fn get_channel_for_user(&self, channel_id: ChannelId, user_id: UserId) -> Result<ChannelsForUser> {
+    pub async fn get_channel_for_user(
+        &self,
+        channel_id: ChannelId,
+        user_id: UserId,
+    ) -> Result<ChannelsForUser> {
         self.transaction(|tx| async move {
             let tx = tx;
 
@@ -413,12 +414,16 @@ impl Database {
                 .all(&*tx)
                 .await?;
 
-            self.get_user_channels(channel_membership, user_id, &tx).await
+            self.get_user_channels(channel_membership, &tx).await
         })
         .await
     }
 
-    pub async fn get_user_channels(&self, channel_memberships: Vec<channel_member::Model>, user_id: UserId, tx: &DatabaseTransaction)  -> Result<ChannelsForUser> {
+    pub async fn get_user_channels(
+        &self,
+        channel_memberships: Vec<channel_member::Model>,
+        tx: &DatabaseTransaction,
+    ) -> Result<ChannelsForUser> {
         let parents_by_child_id = self
             .get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
             .await?;
@@ -824,9 +829,9 @@ impl Database {
         self.check_user_is_channel_admin(to, user, &*tx).await?;
 
         let to_ancestors = self.get_channel_ancestors(to, &*tx).await?;
-        let mut from_descendants = self.get_channel_descendants([channel], &*tx).await?;
+        let mut channel_descendants = self.get_channel_descendants([channel], &*tx).await?;
         for ancestor in to_ancestors {
-            if from_descendants.contains_key(&ancestor) {
+            if channel_descendants.contains_key(&ancestor) {
                 return Err(anyhow!("Cannot create a channel cycle").into());
             }
         }
@@ -853,15 +858,17 @@ impl Database {
             ],
         );
         tx.execute(channel_paths_stmt).await?;
-        for (from_id, to_ids) in from_descendants.iter().filter(|(id, _)| id != &&channel) {
-            for to_id in to_ids.iter() {
+        for (descdenant_id, descendant_parent_ids) in
+            channel_descendants.iter().filter(|(id, _)| id != &&channel)
+        {
+            for descendant_parent_id in descendant_parent_ids.iter() {
                 let channel_paths_stmt = Statement::from_sql_and_values(
                     self.pool.get_database_backend(),
                     sql,
                     [
-                        from_id.to_proto().into(),
-                        from_id.to_proto().into(),
-                        to_id.to_proto().into(),
+                        descdenant_id.to_proto().into(),
+                        descdenant_id.to_proto().into(),
+                        descendant_parent_id.to_proto().into(),
                     ],
                 );
                 tx.execute(channel_paths_stmt).await?;
@@ -883,14 +890,14 @@ impl Database {
             tx.execute(channel_paths_stmt).await?;
         }
 
-        if let Some(channel) = from_descendants.get_mut(&channel) {
+        if let Some(channel) = channel_descendants.get_mut(&channel) {
             // Remove the other parents
             channel.clear();
             channel.insert(to);
         }
 
         let channels = self
-            .get_channel_graph(from_descendants, false, &*tx)
+            .get_channel_graph(channel_descendants, false, &*tx)
             .await?;
 
         Ok(channels)
@@ -1009,8 +1016,16 @@ impl PartialEq for ChannelGraph {
         // Order independent comparison for tests
         let channels_set = self.channels.iter().collect::<HashSet<_>>();
         let other_channels_set = other.channels.iter().collect::<HashSet<_>>();
-        let edges_set = self.edges.iter().map(|edge| (edge.channel_id, edge.parent_id)).collect::<HashSet<_>>();
-        let other_edges_set = other.edges.iter().map(|edge| (edge.channel_id, edge.parent_id)).collect::<HashSet<_>>();
+        let edges_set = self
+            .edges
+            .iter()
+            .map(|edge| (edge.channel_id, edge.parent_id))
+            .collect::<HashSet<_>>();
+        let other_edges_set = other
+            .edges
+            .iter()
+            .map(|edge| (edge.channel_id, edge.parent_id))
+            .collect::<HashSet<_>>();
 
         channels_set == other_channels_set && edges_set == other_edges_set
     }
@@ -1023,7 +1038,6 @@ impl PartialEq for ChannelGraph {
     }
 }
 
-
 struct SmallSet<T>(SmallVec<[T; 1]>);
 
 impl<T> Deref for SmallSet<T> {

crates/collab/src/db/tests/channel_tests.rs 🔗

@@ -5,7 +5,7 @@ use rpc::{
 };
 
 use crate::{
-    db::{queries::channels::ChannelGraph, ChannelId, Database, NewUserParams, tests::graph},
+    db::{queries::channels::ChannelGraph, tests::graph, ChannelId, Database, NewUserParams},
     test_both_dbs,
 };
 use std::sync::Arc;

crates/collab/src/rpc.rs 🔗

@@ -3,8 +3,8 @@ mod connection_pool;
 use crate::{
     auth,
     db::{
-        self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId,
-        ServerId, User, UserId,
+        self, ChannelId, ChannelsForUser, Database, MessageId, ProjectId, RoomId, ServerId, User,
+        UserId,
     },
     executor::Executor,
     AppState, Result,
@@ -38,8 +38,8 @@ use lazy_static::lazy_static;
 use prometheus::{register_int_gauge, IntGauge};
 use rpc::{
     proto::{
-        self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage,
-        LiveKitConnectionInfo, RequestMessage, ChannelEdge,
+        self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, ChannelEdge, EntityMessage,
+        EnvelopedMessage, LiveKitConnectionInfo, RequestMessage,
     },
     Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
 };
@@ -2213,17 +2213,14 @@ async fn create_channel(
 
     let update = proto::UpdateChannels {
         channels: vec![channel],
-        insert_edge: vec![
-            ChannelEdge {
-                parent_id: parent_id.to_proto(),
-                channel_id: id.to_proto(),
-            }
-        ],
+        insert_edge: vec![ChannelEdge {
+            parent_id: parent_id.to_proto(),
+            channel_id: id.to_proto(),
+        }],
         ..Default::default()
     };
 
-    let user_ids_to_notify =
-        db.get_channel_members(parent_id).await?;
+    let user_ids_to_notify = db.get_channel_members(parent_id).await?;
 
     let connection_pool = session.connection_pool().await;
     for user_id in user_ids_to_notify {
@@ -2549,10 +2546,16 @@ async fn respond_to_channel_invite(
         let result = db.get_channel_for_user(channel_id, session.user_id).await?;
         update
             .channels
-            .extend(result.channels.channels.into_iter().map(|channel| proto::Channel {
-                id: channel.id.to_proto(),
-                name: channel.name,
-            }));
+            .extend(
+                result
+                    .channels
+                    .channels
+                    .into_iter()
+                    .map(|channel| proto::Channel {
+                        id: channel.id.to_proto(),
+                        name: channel.name,
+                    }),
+            );
         update.insert_edge = result.channels.edges;
         update
             .channel_participants
@@ -2971,7 +2974,7 @@ fn build_initial_channels_update(
 ) -> proto::UpdateChannels {
     let mut update = proto::UpdateChannels::default();
 
-    for channel in channels.channels.channels{
+    for channel in channels.channels.channels {
         update.channels.push(proto::Channel {
             id: channel.id.to_proto(),
             name: channel.name,

crates/collab_ui/src/collab_panel.rs 🔗

@@ -2024,11 +2024,15 @@ impl CollabPanel {
             if let Some(channel_name) = channel_name {
                 items.push(ContextMenuItem::action(
                     format!("Move '#{}' here", channel_name),
-                    MoveChannel { to: path.channel_id() },
+                    MoveChannel {
+                        to: path.channel_id(),
+                    },
                 ));
                 items.push(ContextMenuItem::action(
                     format!("Link '#{}' here", channel_name),
-                    LinkChannel { to: path.channel_id() },
+                    LinkChannel {
+                        to: path.channel_id(),
+                    },
                 ));
                 items.push(ContextMenuItem::Separator)
             }
@@ -2046,7 +2050,12 @@ impl CollabPanel {
                         location: path.clone(),
                     },
                 ),
-                ContextMenuItem::action("Open Notes", OpenChannelBuffer { channel_id: path.channel_id() }),
+                ContextMenuItem::action(
+                    "Open Notes",
+                    OpenChannelBuffer {
+                        channel_id: path.channel_id(),
+                    },
+                ),
             ]);
 
             if self.channel_store.read(cx).is_user_admin(path.channel_id()) {
@@ -2306,7 +2315,6 @@ impl CollabPanel {
         }
 
         self.toggle_channel_collapsed(&path.clone(), cx);
-
     }
 
     fn expand_selected_channel(&mut self, _: &ExpandSelectedChannel, cx: &mut ViewContext<Self>) {
@@ -2324,11 +2332,19 @@ impl CollabPanel {
         self.toggle_channel_collapsed(path.to_owned(), cx)
     }
 
-    fn toggle_channel_collapsed_action(&mut self, action: &ToggleCollapse, cx: &mut ViewContext<Self>) {
+    fn toggle_channel_collapsed_action(
+        &mut self,
+        action: &ToggleCollapse,
+        cx: &mut ViewContext<Self>,
+    ) {
         self.toggle_channel_collapsed(&action.location, cx);
     }
 
-    fn toggle_channel_collapsed<'a>(&mut self, path: impl Into<Cow<'a, ChannelPath>>, cx: &mut ViewContext<Self>) {
+    fn toggle_channel_collapsed<'a>(
+        &mut self,
+        path: impl Into<Cow<'a, ChannelPath>>,
+        cx: &mut ViewContext<Self>,
+    ) {
         let path = path.into();
         match self.collapsed_channels.binary_search(&path) {
             Ok(ix) => {