Add channel2 crate

Max Brunsfeld and Marshall created

Co-authored-by: Marshall <marshall@zed.dev>

Change summary

Cargo.lock                                         |   37 
Cargo.toml                                         |    1 
crates/channel2/Cargo.toml                         |   54 
crates/channel2/src/channel2.rs                    |   23 
crates/channel2/src/channel_buffer.rs              |  257 ++++
crates/channel2/src/channel_chat.rs                |  647 ++++++++++
crates/channel2/src/channel_store.rs               | 1021 ++++++++++++++++
crates/channel2/src/channel_store/channel_index.rs |  184 ++
crates/channel2/src/channel_store_tests.rs         |  380 +++++
crates/client2/src/user.rs                         |   33 
crates/gpui2/src/app/test_context.rs               |   19 
crates/rpc2/proto/zed.proto                        |  275 ++-
crates/rpc2/src/proto.rs                           |  186 +-
13 files changed, 2,899 insertions(+), 218 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1347,6 +1347,43 @@ dependencies = [
  "uuid 1.4.1",
 ]
 
+[[package]]
+name = "channel2"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "client2",
+ "clock",
+ "collections",
+ "db2",
+ "feature_flags2",
+ "futures 0.3.28",
+ "gpui2",
+ "image",
+ "language2",
+ "lazy_static",
+ "log",
+ "parking_lot 0.11.2",
+ "postage",
+ "rand 0.8.5",
+ "rpc2",
+ "schemars",
+ "serde",
+ "serde_derive",
+ "settings2",
+ "smallvec",
+ "smol",
+ "sum_tree",
+ "tempfile",
+ "text",
+ "thiserror",
+ "time",
+ "tiny_http",
+ "url",
+ "util",
+ "uuid 1.4.1",
+]
+
 [[package]]
 name = "chrono"
 version = "0.4.31"

Cargo.toml 🔗

@@ -10,6 +10,7 @@ members = [
     "crates/call",
     "crates/call2",
     "crates/channel",
+    "crates/channel2",
     "crates/cli",
     "crates/client",
     "crates/client2",

crates/channel2/Cargo.toml 🔗

@@ -0,0 +1,54 @@
+[package]
+name = "channel2"
+version = "0.1.0"
+edition = "2021"
+publish = false
+
+[lib]
+path = "src/channel2.rs"
+doctest = false
+
+[features]
+test-support = ["collections/test-support", "gpui2/test-support", "rpc2/test-support"]
+
+[dependencies]
+client2 = { path = "../client2" }
+collections = { path = "../collections" }
+db2 = { path = "../db2" }
+gpui2 = { path = "../gpui2" }
+util = { path = "../util" }
+rpc2 = { path = "../rpc2" }
+text = { path = "../text" }
+language2 = { path = "../language2" }
+settings2 = { path = "../settings2" }
+feature_flags2 = { path = "../feature_flags2" }
+sum_tree = { path = "../sum_tree" }
+clock = { path = "../clock" }
+
+anyhow.workspace = true
+futures.workspace = true
+image = "0.23"
+lazy_static.workspace = true
+smallvec.workspace = true
+log.workspace = true
+parking_lot.workspace = true
+postage.workspace = true
+rand.workspace = true
+schemars.workspace = true
+smol.workspace = true
+thiserror.workspace = true
+time.workspace = true
+tiny_http = "0.8"
+uuid.workspace = true
+url = "2.2"
+serde.workspace = true
+serde_derive.workspace = true
+tempfile = "3"
+
+[dev-dependencies]
+collections = { path = "../collections", features = ["test-support"] }
+gpui2 = { path = "../gpui2", features = ["test-support"] }
+rpc2 = { path = "../rpc2", features = ["test-support"] }
+client2 = { path = "../client2", features = ["test-support"] }
+settings2 = { path = "../settings2", features = ["test-support"] }
+util = { path = "../util", features = ["test-support"] }

crates/channel2/src/channel2.rs 🔗

@@ -0,0 +1,23 @@
+mod channel_buffer;
+mod channel_chat;
+mod channel_store;
+
+use client2::{Client, UserStore};
+use gpui2::{AppContext, Model};
+use std::sync::Arc;
+
+pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent, ACKNOWLEDGE_DEBOUNCE_INTERVAL};
+pub use channel_chat::{
+    mentions_to_proto, ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId,
+    MessageParams,
+};
+pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore};
+
+#[cfg(test)]
+mod channel_store_tests;
+
+pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
+    channel_store::init(client, user_store, cx);
+    channel_buffer::init(client);
+    channel_chat::init(client);
+}

crates/channel2/src/channel_buffer.rs 🔗

@@ -0,0 +1,257 @@
+use crate::{Channel, ChannelId, ChannelStore};
+use anyhow::Result;
+use client2::{Client, Collaborator, UserStore};
+use collections::HashMap;
+use gpui2::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
+use language2::proto::serialize_version;
+use rpc2::{
+    proto::{self, PeerId},
+    TypedEnvelope,
+};
+use std::{sync::Arc, time::Duration};
+use util::ResultExt;
+
+pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
+
+pub(crate) fn init(client: &Arc<Client>) {
+    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
+    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
+}
+
+pub struct ChannelBuffer {
+    pub channel_id: ChannelId,
+    connected: bool,
+    collaborators: HashMap<PeerId, Collaborator>,
+    user_store: Model<UserStore>,
+    channel_store: Model<ChannelStore>,
+    buffer: Model<language2::Buffer>,
+    buffer_epoch: u64,
+    client: Arc<Client>,
+    subscription: Option<client2::Subscription>,
+    acknowledge_task: Option<Task<Result<()>>>,
+}
+
+pub enum ChannelBufferEvent {
+    CollaboratorsChanged,
+    Disconnected,
+    BufferEdited,
+    ChannelChanged,
+}
+
+impl EventEmitter for ChannelBuffer {
+    type Event = ChannelBufferEvent;
+}
+
+impl ChannelBuffer {
+    pub(crate) async fn new(
+        channel: Arc<Channel>,
+        client: Arc<Client>,
+        user_store: Model<UserStore>,
+        channel_store: Model<ChannelStore>,
+        mut cx: AsyncAppContext,
+    ) -> Result<Model<Self>> {
+        let response = client
+            .request(proto::JoinChannelBuffer {
+                channel_id: channel.id,
+            })
+            .await?;
+
+        let base_text = response.base_text;
+        let operations = response
+            .operations
+            .into_iter()
+            .map(language2::proto::deserialize_operation)
+            .collect::<Result<Vec<_>, _>>()?;
+
+        let buffer = cx.build_model(|_| {
+            language2::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
+        })?;
+        buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??;
+
+        let subscription = client.subscribe_to_entity(channel.id)?;
+
+        anyhow::Ok(cx.build_model(|cx| {
+            cx.subscribe(&buffer, Self::on_buffer_update).detach();
+            cx.on_release(Self::release).detach();
+            let mut this = Self {
+                buffer,
+                buffer_epoch: response.epoch,
+                client,
+                connected: true,
+                collaborators: Default::default(),
+                acknowledge_task: None,
+                channel_id: channel.id,
+                subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
+                user_store,
+                channel_store,
+            };
+            this.replace_collaborators(response.collaborators, cx);
+            this
+        })?)
+    }
+
+    fn release(&mut self, _: &mut AppContext) {
+        if self.connected {
+            if let Some(task) = self.acknowledge_task.take() {
+                task.detach();
+            }
+            self.client
+                .send(proto::LeaveChannelBuffer {
+                    channel_id: self.channel_id,
+                })
+                .log_err();
+        }
+    }
+
+    pub fn remote_id(&self, cx: &AppContext) -> u64 {
+        self.buffer.read(cx).remote_id()
+    }
+
+    pub fn user_store(&self) -> &Model<UserStore> {
+        &self.user_store
+    }
+
+    pub(crate) fn replace_collaborators(
+        &mut self,
+        collaborators: Vec<proto::Collaborator>,
+        cx: &mut ModelContext<Self>,
+    ) {
+        let mut new_collaborators = HashMap::default();
+        for collaborator in collaborators {
+            if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
+                new_collaborators.insert(collaborator.peer_id, collaborator);
+            }
+        }
+
+        for (_, old_collaborator) in &self.collaborators {
+            if !new_collaborators.contains_key(&old_collaborator.peer_id) {
+                self.buffer.update(cx, |buffer, cx| {
+                    buffer.remove_peer(old_collaborator.replica_id as u16, cx)
+                });
+            }
+        }
+        self.collaborators = new_collaborators;
+        cx.emit(ChannelBufferEvent::CollaboratorsChanged);
+        cx.notify();
+    }
+
+    async fn handle_update_channel_buffer(
+        this: Model<Self>,
+        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let ops = update_channel_buffer
+            .payload
+            .operations
+            .into_iter()
+            .map(language2::proto::deserialize_operation)
+            .collect::<Result<Vec<_>, _>>()?;
+
+        this.update(&mut cx, |this, cx| {
+            cx.notify();
+            this.buffer
+                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
+        })??;
+
+        Ok(())
+    }
+
+    async fn handle_update_channel_buffer_collaborators(
+        this: Model<Self>,
+        message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, cx| {
+            this.replace_collaborators(message.payload.collaborators, cx);
+            cx.emit(ChannelBufferEvent::CollaboratorsChanged);
+            cx.notify();
+        })
+    }
+
+    fn on_buffer_update(
+        &mut self,
+        _: Model<language2::Buffer>,
+        event: &language2::Event,
+        cx: &mut ModelContext<Self>,
+    ) {
+        match event {
+            language2::Event::Operation(operation) => {
+                let operation = language2::proto::serialize_operation(operation);
+                self.client
+                    .send(proto::UpdateChannelBuffer {
+                        channel_id: self.channel_id,
+                        operations: vec![operation],
+                    })
+                    .log_err();
+            }
+            language2::Event::Edited => {
+                cx.emit(ChannelBufferEvent::BufferEdited);
+            }
+            _ => {}
+        }
+    }
+
+    pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
+        let buffer = self.buffer.read(cx);
+        let version = buffer.version();
+        let buffer_id = buffer.remote_id();
+        let client = self.client.clone();
+        let epoch = self.epoch();
+
+        self.acknowledge_task = Some(cx.spawn(move |_, cx| async move {
+            cx.executor().timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL).await;
+            client
+                .send(proto::AckBufferOperation {
+                    buffer_id,
+                    epoch,
+                    version: serialize_version(&version),
+                })
+                .ok();
+            Ok(())
+        }));
+    }
+
+    pub fn epoch(&self) -> u64 {
+        self.buffer_epoch
+    }
+
+    pub fn buffer(&self) -> Model<language2::Buffer> {
+        self.buffer.clone()
+    }
+
+    pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
+        &self.collaborators
+    }
+
+    pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
+        self.channel_store
+            .read(cx)
+            .channel_for_id(self.channel_id)
+            .cloned()
+    }
+
+    pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
+        log::info!("channel buffer {} disconnected", self.channel_id);
+        if self.connected {
+            self.connected = false;
+            self.subscription.take();
+            cx.emit(ChannelBufferEvent::Disconnected);
+            cx.notify()
+        }
+    }
+
+    pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext<Self>) {
+        cx.emit(ChannelBufferEvent::ChannelChanged);
+        cx.notify()
+    }
+
+    pub fn is_connected(&self) -> bool {
+        self.connected
+    }
+
+    pub fn replica_id(&self, cx: &AppContext) -> u16 {
+        self.buffer.read(cx).replica_id()
+    }
+}

crates/channel2/src/channel_chat.rs 🔗

@@ -0,0 +1,647 @@
+use crate::{Channel, ChannelId, ChannelStore};
+use anyhow::{anyhow, Result};
+use client2::{
+    proto,
+    user::{User, UserStore},
+    Client, Subscription, TypedEnvelope, UserId,
+};
+use futures::lock::Mutex;
+use gpui2::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
+use rand::prelude::*;
+use std::{
+    collections::HashSet,
+    mem,
+    ops::{ControlFlow, Range},
+    sync::Arc,
+};
+use sum_tree::{Bias, SumTree};
+use time::OffsetDateTime;
+use util::{post_inc, ResultExt as _, TryFutureExt};
+
+pub struct ChannelChat {
+    pub channel_id: ChannelId,
+    messages: SumTree<ChannelMessage>,
+    acknowledged_message_ids: HashSet<u64>,
+    channel_store: Model<ChannelStore>,
+    loaded_all_messages: bool,
+    last_acknowledged_id: Option<u64>,
+    next_pending_message_id: usize,
+    user_store: Model<UserStore>,
+    rpc: Arc<Client>,
+    outgoing_messages_lock: Arc<Mutex<()>>,
+    rng: StdRng,
+    _subscription: Subscription,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct MessageParams {
+    pub text: String,
+    pub mentions: Vec<(Range<usize>, UserId)>,
+}
+
+#[derive(Clone, Debug)]
+pub struct ChannelMessage {
+    pub id: ChannelMessageId,
+    pub body: String,
+    pub timestamp: OffsetDateTime,
+    pub sender: Arc<User>,
+    pub nonce: u128,
+    pub mentions: Vec<(Range<usize>, UserId)>,
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub enum ChannelMessageId {
+    Saved(u64),
+    Pending(usize),
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct ChannelMessageSummary {
+    max_id: ChannelMessageId,
+    count: usize,
+}
+
+#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct Count(usize);
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum ChannelChatEvent {
+    MessagesUpdated {
+        old_range: Range<usize>,
+        new_count: usize,
+    },
+    NewMessage {
+        channel_id: ChannelId,
+        message_id: u64,
+    },
+}
+
+impl EventEmitter for ChannelChat {
+    type Event = ChannelChatEvent;
+}
+pub fn init(client: &Arc<Client>) {
+    client.add_model_message_handler(ChannelChat::handle_message_sent);
+    client.add_model_message_handler(ChannelChat::handle_message_removed);
+}
+
+impl ChannelChat {
+    pub async fn new(
+        channel: Arc<Channel>,
+        channel_store: Model<ChannelStore>,
+        user_store: Model<UserStore>,
+        client: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<Model<Self>> {
+        let channel_id = channel.id;
+        let subscription = client.subscribe_to_entity(channel_id).unwrap();
+
+        let response = client
+            .request(proto::JoinChannelChat { channel_id })
+            .await?;
+        let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+        let loaded_all_messages = response.done;
+
+        Ok(cx.build_model(|cx| {
+            cx.on_release(Self::release).detach();
+            let mut this = Self {
+                channel_id: channel.id,
+                user_store,
+                channel_store,
+                rpc: client,
+                outgoing_messages_lock: Default::default(),
+                messages: Default::default(),
+                acknowledged_message_ids: Default::default(),
+                loaded_all_messages,
+                next_pending_message_id: 0,
+                last_acknowledged_id: None,
+                rng: StdRng::from_entropy(),
+                _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
+            };
+            this.insert_messages(messages, cx);
+            this
+        })?)
+    }
+
+    fn release(&mut self, _: &mut AppContext) {
+        self.rpc
+            .send(proto::LeaveChannelChat {
+                channel_id: self.channel_id,
+            })
+            .log_err();
+    }
+
+    pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
+        self.channel_store
+            .read(cx)
+            .channel_for_id(self.channel_id)
+            .cloned()
+    }
+
+    pub fn client(&self) -> &Arc<Client> {
+        &self.rpc
+    }
+
+    pub fn send_message(
+        &mut self,
+        message: MessageParams,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<Task<Result<u64>>> {
+        if message.text.is_empty() {
+            Err(anyhow!("message body can't be empty"))?;
+        }
+
+        let current_user = self
+            .user_store
+            .read(cx)
+            .current_user()
+            .ok_or_else(|| anyhow!("current_user is not present"))?;
+
+        let channel_id = self.channel_id;
+        let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
+        let nonce = self.rng.gen();
+        self.insert_messages(
+            SumTree::from_item(
+                ChannelMessage {
+                    id: pending_id,
+                    body: message.text.clone(),
+                    sender: current_user,
+                    timestamp: OffsetDateTime::now_utc(),
+                    mentions: message.mentions.clone(),
+                    nonce,
+                },
+                &(),
+            ),
+            cx,
+        );
+        let user_store = self.user_store.clone();
+        let rpc = self.rpc.clone();
+        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
+        Ok(cx.spawn(move |this, mut cx| async move {
+            let outgoing_message_guard = outgoing_messages_lock.lock().await;
+            let request = rpc.request(proto::SendChannelMessage {
+                channel_id,
+                body: message.text,
+                nonce: Some(nonce.into()),
+                mentions: mentions_to_proto(&message.mentions),
+            });
+            let response = request.await?;
+            drop(outgoing_message_guard);
+            let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
+            let id = response.id;
+            let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
+            this.update(&mut cx, |this, cx| {
+                this.insert_messages(SumTree::from_item(message, &()), cx);
+            })?;
+            Ok(id)
+        }))
+    }
+
+    pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+        let response = self.rpc.request(proto::RemoveChannelMessage {
+            channel_id: self.channel_id,
+            message_id: id,
+        });
+        cx.spawn(move |this, mut cx| async move {
+            response.await?;
+            this.update(&mut cx, |this, cx| {
+                this.message_removed(id, cx);
+            })?;
+            Ok(())
+        })
+    }
+
+    pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Option<()>>> {
+        if self.loaded_all_messages {
+            return None;
+        }
+
+        let rpc = self.rpc.clone();
+        let user_store = self.user_store.clone();
+        let channel_id = self.channel_id;
+        let before_message_id = self.first_loaded_message_id()?;
+        Some(cx.spawn(move |this, mut cx| {
+            async move {
+                let response = rpc
+                    .request(proto::GetChannelMessages {
+                        channel_id,
+                        before_message_id,
+                    })
+                    .await?;
+                let loaded_all_messages = response.done;
+                let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+                this.update(&mut cx, |this, cx| {
+                    this.loaded_all_messages = loaded_all_messages;
+                    this.insert_messages(messages, cx);
+                })?;
+                anyhow::Ok(())
+            }
+            .log_err()
+        }))
+    }
+
+    pub fn first_loaded_message_id(&mut self) -> Option<u64> {
+        self.messages.first().and_then(|message| match message.id {
+            ChannelMessageId::Saved(id) => Some(id),
+            ChannelMessageId::Pending(_) => None,
+        })
+    }
+
+    /// Load all of the chat messages since a certain message id.
+    ///
+    /// For now, we always maintain a suffix of the channel's messages.
+    pub async fn load_history_since_message(
+        chat: Model<Self>,
+        message_id: u64,
+        mut cx: AsyncAppContext,
+    ) -> Option<usize> {
+        loop {
+            let step = chat
+                .update(&mut cx, |chat, cx| {
+                    if let Some(first_id) = chat.first_loaded_message_id() {
+                        if first_id <= message_id {
+                            let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>();
+                            let message_id = ChannelMessageId::Saved(message_id);
+                            cursor.seek(&message_id, Bias::Left, &());
+                            return ControlFlow::Break(
+                                if cursor
+                                    .item()
+                                    .map_or(false, |message| message.id == message_id)
+                                {
+                                    Some(cursor.start().1 .0)
+                                } else {
+                                    None
+                                },
+                            );
+                        }
+                    }
+                    ControlFlow::Continue(chat.load_more_messages(cx))
+                })
+                .log_err()?;
+            match step {
+                ControlFlow::Break(ix) => return ix,
+                ControlFlow::Continue(task) => task?.await?,
+            }
+        }
+    }
+
+    pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext<Self>) {
+        if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
+            if self
+                .last_acknowledged_id
+                .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
+            {
+                self.rpc
+                    .send(proto::AckChannelMessage {
+                        channel_id: self.channel_id,
+                        message_id: latest_message_id,
+                    })
+                    .ok();
+                self.last_acknowledged_id = Some(latest_message_id);
+                self.channel_store.update(cx, |store, cx| {
+                    store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
+                });
+            }
+        }
+    }
+
+    pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
+        let user_store = self.user_store.clone();
+        let rpc = self.rpc.clone();
+        let channel_id = self.channel_id;
+        cx.spawn(move |this, mut cx| {
+            async move {
+                let response = rpc.request(proto::JoinChannelChat { channel_id }).await?;
+                let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+                let loaded_all_messages = response.done;
+
+                let pending_messages = this.update(&mut cx, |this, cx| {
+                    if let Some((first_new_message, last_old_message)) =
+                        messages.first().zip(this.messages.last())
+                    {
+                        if first_new_message.id > last_old_message.id {
+                            let old_messages = mem::take(&mut this.messages);
+                            cx.emit(ChannelChatEvent::MessagesUpdated {
+                                old_range: 0..old_messages.summary().count,
+                                new_count: 0,
+                            });
+                            this.loaded_all_messages = loaded_all_messages;
+                        }
+                    }
+
+                    this.insert_messages(messages, cx);
+                    if loaded_all_messages {
+                        this.loaded_all_messages = loaded_all_messages;
+                    }
+
+                    this.pending_messages().cloned().collect::<Vec<_>>()
+                })?;
+
+                for pending_message in pending_messages {
+                    let request = rpc.request(proto::SendChannelMessage {
+                        channel_id,
+                        body: pending_message.body,
+                        mentions: mentions_to_proto(&pending_message.mentions),
+                        nonce: Some(pending_message.nonce.into()),
+                    });
+                    let response = request.await?;
+                    let message = ChannelMessage::from_proto(
+                        response.message.ok_or_else(|| anyhow!("invalid message"))?,
+                        &user_store,
+                        &mut cx,
+                    )
+                    .await?;
+                    this.update(&mut cx, |this, cx| {
+                        this.insert_messages(SumTree::from_item(message, &()), cx);
+                    })?;
+                }
+
+                anyhow::Ok(())
+            }
+            .log_err()
+        })
+        .detach();
+    }
+
+    pub fn message_count(&self) -> usize {
+        self.messages.summary().count
+    }
+
+    pub fn messages(&self) -> &SumTree<ChannelMessage> {
+        &self.messages
+    }
+
+    pub fn message(&self, ix: usize) -> &ChannelMessage {
+        let mut cursor = self.messages.cursor::<Count>();
+        cursor.seek(&Count(ix), Bias::Right, &());
+        cursor.item().unwrap()
+    }
+
+    pub fn acknowledge_message(&mut self, id: u64) {
+        if self.acknowledged_message_ids.insert(id) {
+            self.rpc
+                .send(proto::AckChannelMessage {
+                    channel_id: self.channel_id,
+                    message_id: id,
+                })
+                .ok();
+        }
+    }
+
+    pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
+        let mut cursor = self.messages.cursor::<Count>();
+        cursor.seek(&Count(range.start), Bias::Right, &());
+        cursor.take(range.len())
+    }
+
+    pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
+        let mut cursor = self.messages.cursor::<ChannelMessageId>();
+        cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
+        cursor
+    }
+
+    async fn handle_message_sent(
+        this: Model<Self>,
+        message: TypedEnvelope<proto::ChannelMessageSent>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
+        let message = message
+            .payload
+            .message
+            .ok_or_else(|| anyhow!("empty message"))?;
+        let message_id = message.id;
+
+        let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
+        this.update(&mut cx, |this, cx| {
+            this.insert_messages(SumTree::from_item(message, &()), cx);
+            cx.emit(ChannelChatEvent::NewMessage {
+                channel_id: this.channel_id,
+                message_id,
+            })
+        })?;
+
+        Ok(())
+    }
+
+    async fn handle_message_removed(
+        this: Model<Self>,
+        message: TypedEnvelope<proto::RemoveChannelMessage>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, cx| {
+            this.message_removed(message.payload.message_id, cx)
+        })?;
+        Ok(())
+    }
+
+    fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
+        if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
+            let nonces = messages
+                .cursor::<()>()
+                .map(|m| m.nonce)
+                .collect::<HashSet<_>>();
+
+            let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
+            let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
+            let start_ix = old_cursor.start().1 .0;
+            let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
+            let removed_count = removed_messages.summary().count;
+            let new_count = messages.summary().count;
+            let end_ix = start_ix + removed_count;
+
+            new_messages.append(messages, &());
+
+            let mut ranges = Vec::<Range<usize>>::new();
+            if new_messages.last().unwrap().is_pending() {
+                new_messages.append(old_cursor.suffix(&()), &());
+            } else {
+                new_messages.append(
+                    old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
+                    &(),
+                );
+
+                while let Some(message) = old_cursor.item() {
+                    let message_ix = old_cursor.start().1 .0;
+                    if nonces.contains(&message.nonce) {
+                        if ranges.last().map_or(false, |r| r.end == message_ix) {
+                            ranges.last_mut().unwrap().end += 1;
+                        } else {
+                            ranges.push(message_ix..message_ix + 1);
+                        }
+                    } else {
+                        new_messages.push(message.clone(), &());
+                    }
+                    old_cursor.next(&());
+                }
+            }
+
+            drop(old_cursor);
+            self.messages = new_messages;
+
+            for range in ranges.into_iter().rev() {
+                cx.emit(ChannelChatEvent::MessagesUpdated {
+                    old_range: range,
+                    new_count: 0,
+                });
+            }
+            cx.emit(ChannelChatEvent::MessagesUpdated {
+                old_range: start_ix..end_ix,
+                new_count,
+            });
+
+            cx.notify();
+        }
+    }
+
+    fn message_removed(&mut self, id: u64, cx: &mut ModelContext<Self>) {
+        let mut cursor = self.messages.cursor::<ChannelMessageId>();
+        let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
+        if let Some(item) = cursor.item() {
+            if item.id == ChannelMessageId::Saved(id) {
+                let ix = messages.summary().count;
+                cursor.next(&());
+                messages.append(cursor.suffix(&()), &());
+                drop(cursor);
+                self.messages = messages;
+                cx.emit(ChannelChatEvent::MessagesUpdated {
+                    old_range: ix..ix + 1,
+                    new_count: 0,
+                });
+            }
+        }
+    }
+}
+
+async fn messages_from_proto(
+    proto_messages: Vec<proto::ChannelMessage>,
+    user_store: &Model<UserStore>,
+    cx: &mut AsyncAppContext,
+) -> Result<SumTree<ChannelMessage>> {
+    let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
+    let mut result = SumTree::new();
+    result.extend(messages, &());
+    Ok(result)
+}
+
+impl ChannelMessage {
+    pub async fn from_proto(
+        message: proto::ChannelMessage,
+        user_store: &Model<UserStore>,
+        cx: &mut AsyncAppContext,
+    ) -> Result<Self> {
+        let sender = user_store
+            .update(cx, |user_store, cx| {
+                user_store.get_user(message.sender_id, cx)
+            })?
+            .await?;
+        Ok(ChannelMessage {
+            id: ChannelMessageId::Saved(message.id),
+            body: message.body,
+            mentions: message
+                .mentions
+                .into_iter()
+                .filter_map(|mention| {
+                    let range = mention.range?;
+                    Some((range.start as usize..range.end as usize, mention.user_id))
+                })
+                .collect(),
+            timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
+            sender,
+            nonce: message
+                .nonce
+                .ok_or_else(|| anyhow!("nonce is required"))?
+                .into(),
+        })
+    }
+
+    pub fn is_pending(&self) -> bool {
+        matches!(self.id, ChannelMessageId::Pending(_))
+    }
+
+    pub async fn from_proto_vec(
+        proto_messages: Vec<proto::ChannelMessage>,
+        user_store: &Model<UserStore>,
+        cx: &mut AsyncAppContext,
+    ) -> Result<Vec<Self>> {
+        let unique_user_ids = proto_messages
+            .iter()
+            .map(|m| m.sender_id)
+            .collect::<HashSet<_>>()
+            .into_iter()
+            .collect();
+        user_store
+            .update(cx, |user_store, cx| {
+                user_store.get_users(unique_user_ids, cx)
+            })?
+            .await?;
+
+        let mut messages = Vec::with_capacity(proto_messages.len());
+        for message in proto_messages {
+            messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
+        }
+        Ok(messages)
+    }
+}
+
+pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
+    mentions
+        .iter()
+        .map(|(range, user_id)| proto::ChatMention {
+            range: Some(proto::Range {
+                start: range.start as u64,
+                end: range.end as u64,
+            }),
+            user_id: *user_id as u64,
+        })
+        .collect()
+}
+
+impl sum_tree::Item for ChannelMessage {
+    type Summary = ChannelMessageSummary;
+
+    fn summary(&self) -> Self::Summary {
+        ChannelMessageSummary {
+            max_id: self.id,
+            count: 1,
+        }
+    }
+}
+
+impl Default for ChannelMessageId {
+    fn default() -> Self {
+        Self::Saved(0)
+    }
+}
+
+impl sum_tree::Summary for ChannelMessageSummary {
+    type Context = ();
+
+    fn add_summary(&mut self, summary: &Self, _: &()) {
+        self.max_id = summary.max_id;
+        self.count += summary.count;
+    }
+}
+
+impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
+    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
+        debug_assert!(summary.max_id > *self);
+        *self = summary.max_id;
+    }
+}
+
+impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
+    fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
+        self.0 += summary.count;
+    }
+}
+
+impl<'a> From<&'a str> for MessageParams {
+    fn from(value: &'a str) -> Self {
+        Self {
+            text: value.into(),
+            mentions: Vec::new(),
+        }
+    }
+}

crates/channel2/src/channel_store.rs 🔗

@@ -0,0 +1,1021 @@
+mod channel_index;
+
+use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
+use anyhow::{anyhow, Result};
+use channel_index::ChannelIndex;
+use client2::{Client, Subscription, User, UserId, UserStore};
+use collections::{hash_map, HashMap, HashSet};
+use db2::RELEASE_CHANNEL;
+use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
+use gpui2::{
+    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
+};
+use rpc2::{
+    proto::{self, ChannelVisibility},
+    TypedEnvelope,
+};
+use std::{mem, sync::Arc, time::Duration};
+use util::{async_maybe, ResultExt};
+
+pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
+    let channel_store =
+        cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
+    cx.set_global(channel_store);
+}
+
+pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub type ChannelId = u64;
+
+pub struct ChannelStore {
+    pub channel_index: ChannelIndex,
+    channel_invitations: Vec<Arc<Channel>>,
+    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
+    outgoing_invites: HashSet<(ChannelId, UserId)>,
+    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
+    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
+    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
+    client: Arc<Client>,
+    user_store: Model<UserStore>,
+    _rpc_subscription: Subscription,
+    _watch_connection_status: Task<Option<()>>,
+    disconnect_channel_buffers_task: Option<Task<()>>,
+    _update_channels: Task<()>,
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct Channel {
+    pub id: ChannelId,
+    pub name: String,
+    pub visibility: proto::ChannelVisibility,
+    pub role: proto::ChannelRole,
+    pub unseen_note_version: Option<(u64, clock::Global)>,
+    pub unseen_message_id: Option<u64>,
+    pub parent_path: Vec<u64>,
+}
+
+impl Channel {
+    pub fn link(&self) -> String {
+        RELEASE_CHANNEL.link_prefix().to_owned()
+            + "channel/"
+            + &self.slug()
+            + "-"
+            + &self.id.to_string()
+    }
+
+    pub fn slug(&self) -> String {
+        let slug: String = self
+            .name
+            .chars()
+            .map(|c| if c.is_alphanumeric() { c } else { '-' })
+            .collect();
+
+        slug.trim_matches(|c| c == '-').to_string()
+    }
+
+    pub fn can_edit_notes(&self) -> bool {
+        self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
+    }
+}
+
+pub struct ChannelMembership {
+    pub user: Arc<User>,
+    pub kind: proto::channel_member::Kind,
+    pub role: proto::ChannelRole,
+}
+impl ChannelMembership {
+    pub fn sort_key(&self) -> MembershipSortKey {
+        MembershipSortKey {
+            role_order: match self.role {
+                proto::ChannelRole::Admin => 0,
+                proto::ChannelRole::Member => 1,
+                proto::ChannelRole::Banned => 2,
+                proto::ChannelRole::Guest => 3,
+            },
+            kind_order: match self.kind {
+                proto::channel_member::Kind::Member => 0,
+                proto::channel_member::Kind::AncestorMember => 1,
+                proto::channel_member::Kind::Invitee => 2,
+            },
+            username_order: self.user.github_login.as_str(),
+        }
+    }
+}
+
+#[derive(PartialOrd, Ord, PartialEq, Eq)]
+pub struct MembershipSortKey<'a> {
+    role_order: u8,
+    kind_order: u8,
+    username_order: &'a str,
+}
+
+pub enum ChannelEvent {
+    ChannelCreated(ChannelId),
+    ChannelRenamed(ChannelId),
+}
+
+impl EventEmitter for ChannelStore {
+    type Event = ChannelEvent;
+}
+
+enum OpenedModelHandle<E> {
+    Open(WeakModel<E>),
+    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
+}
+
+impl ChannelStore {
+    pub fn global(cx: &AppContext) -> Model<Self> {
+        cx.global::<Model<Self>>().clone()
+    }
+
+    pub fn new(
+        client: Arc<Client>,
+        user_store: Model<UserStore>,
+        cx: &mut ModelContext<Self>,
+    ) -> Self {
+        let rpc_subscription =
+            client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
+
+        let mut connection_status = client.status();
+        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
+        let watch_connection_status = cx.spawn(|this, mut cx| async move {
+            while let Some(status) = connection_status.next().await {
+                let this = this.upgrade()?;
+                match status {
+                    client2::Status::Connected { .. } => {
+                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
+                            .ok()?
+                            .await
+                            .log_err()?;
+                    }
+                    client2::Status::SignedOut | client2::Status::UpgradeRequired => {
+                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
+                            .ok();
+                    }
+                    _ => {
+                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
+                            .ok();
+                    }
+                }
+            }
+            Some(())
+        });
+
+        Self {
+            channel_invitations: Vec::default(),
+            channel_index: ChannelIndex::default(),
+            channel_participants: Default::default(),
+            outgoing_invites: Default::default(),
+            opened_buffers: Default::default(),
+            opened_chats: Default::default(),
+            update_channels_tx,
+            client,
+            user_store,
+            _rpc_subscription: rpc_subscription,
+            _watch_connection_status: watch_connection_status,
+            disconnect_channel_buffers_task: None,
+            _update_channels: cx.spawn(|this, mut cx| async move {
+                async_maybe!({
+                    while let Some(update_channels) = update_channels_rx.next().await {
+                        if let Some(this) = this.upgrade() {
+                            let update_task = this.update(&mut cx, |this, cx| {
+                                this.update_channels(update_channels, cx)
+                            })?;
+                            if let Some(update_task) = update_task {
+                                update_task.await.log_err();
+                            }
+                        }
+                    }
+                    anyhow::Ok(())
+                })
+                .await
+                .log_err();
+            }),
+        }
+    }
+
+    pub fn client(&self) -> Arc<Client> {
+        self.client.clone()
+    }
+
+    /// Returns the number of unique channels in the store
+    pub fn channel_count(&self) -> usize {
+        self.channel_index.by_id().len()
+    }
+
+    /// Returns the index of a channel ID in the list of unique channels
+    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
+        self.channel_index
+            .by_id()
+            .keys()
+            .position(|id| *id == channel_id)
+    }
+
+    /// Returns an iterator over all unique channels
+    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
+        self.channel_index.by_id().values()
+    }
+
+    /// Iterate over all entries in the channel DAG
+    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
+        self.channel_index
+            .ordered_channels()
+            .iter()
+            .filter_map(move |id| {
+                let channel = self.channel_index.by_id().get(id)?;
+                Some((channel.parent_path.len(), channel))
+            })
+    }
+
+    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
+        let channel_id = self.channel_index.ordered_channels().get(ix)?;
+        self.channel_index.by_id().get(channel_id)
+    }
+
+    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
+        self.channel_index.by_id().values().nth(ix)
+    }
+
+    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
+        self.channel_invitations
+            .iter()
+            .any(|channel| channel.id == channel_id)
+    }
+
+    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
+        &self.channel_invitations
+    }
+
+    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
+        self.channel_index.by_id().get(&channel_id)
+    }
+
+    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
+        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
+            if let OpenedModelHandle::Open(buffer) = buffer {
+                return buffer.upgrade().is_some();
+            }
+        }
+        false
+    }
+
+    pub fn open_channel_buffer(
+        &mut self,
+        channel_id: ChannelId,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<Model<ChannelBuffer>>> {
+        let client = self.client.clone();
+        let user_store = self.user_store.clone();
+        let channel_store = cx.handle();
+        self.open_channel_resource(
+            channel_id,
+            |this| &mut this.opened_buffers,
+            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
+            cx,
+        )
+    }
+
+    pub fn fetch_channel_messages(
+        &self,
+        message_ids: Vec<u64>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<Vec<ChannelMessage>>> {
+        let request = if message_ids.is_empty() {
+            None
+        } else {
+            Some(
+                self.client
+                    .request(proto::GetChannelMessagesById { message_ids }),
+            )
+        };
+        cx.spawn(|this, mut cx| async move {
+            if let Some(request) = request {
+                let response = request.await?;
+                let this = this
+                    .upgrade()
+                    .ok_or_else(|| anyhow!("channel store dropped"))?;
+                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
+                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
+            } else {
+                Ok(Vec::new())
+            }
+        })
+    }
+
+    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
+        self.channel_index
+            .by_id()
+            .get(&channel_id)
+            .map(|channel| channel.unseen_note_version.is_some())
+    }
+
+    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
+        self.channel_index
+            .by_id()
+            .get(&channel_id)
+            .map(|channel| channel.unseen_message_id.is_some())
+    }
+
+    pub fn notes_changed(
+        &mut self,
+        channel_id: ChannelId,
+        epoch: u64,
+        version: &clock::Global,
+        cx: &mut ModelContext<Self>,
+    ) {
+        self.channel_index.note_changed(channel_id, epoch, version);
+        cx.notify();
+    }
+
+    pub fn new_message(
+        &mut self,
+        channel_id: ChannelId,
+        message_id: u64,
+        cx: &mut ModelContext<Self>,
+    ) {
+        self.channel_index.new_message(channel_id, message_id);
+        cx.notify();
+    }
+
+    pub fn acknowledge_message_id(
+        &mut self,
+        channel_id: ChannelId,
+        message_id: u64,
+        cx: &mut ModelContext<Self>,
+    ) {
+        self.channel_index
+            .acknowledge_message_id(channel_id, message_id);
+        cx.notify();
+    }
+
+    pub fn acknowledge_notes_version(
+        &mut self,
+        channel_id: ChannelId,
+        epoch: u64,
+        version: &clock::Global,
+        cx: &mut ModelContext<Self>,
+    ) {
+        self.channel_index
+            .acknowledge_note_version(channel_id, epoch, version);
+        cx.notify();
+    }
+
+    pub fn open_channel_chat(
+        &mut self,
+        channel_id: ChannelId,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<Model<ChannelChat>>> {
+        let client = self.client.clone();
+        let user_store = self.user_store.clone();
+        let this = cx.handle();
+        self.open_channel_resource(
+            channel_id,
+            |this| &mut this.opened_chats,
+            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
+            cx,
+        )
+    }
+
+    /// Asynchronously open a given resource associated with a channel.
+    ///
+    /// Make sure that the resource is only opened once, even if this method
+    /// is called multiple times with the same channel id while the first task
+    /// is still running.
+    fn open_channel_resource<T, F, Fut>(
+        &mut self,
+        channel_id: ChannelId,
+        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
+        load: F,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<Model<T>>>
+    where
+        F: 'static + Send + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
+        Fut: Send + Future<Output = Result<Model<T>>>,
+        T: 'static,
+    {
+        let task = loop {
+            match get_map(self).entry(channel_id) {
+                hash_map::Entry::Occupied(e) => match e.get() {
+                    OpenedModelHandle::Open(model) => {
+                        if let Some(model) = model.upgrade() {
+                            break Task::ready(Ok(model)).shared();
+                        } else {
+                            get_map(self).remove(&channel_id);
+                            continue;
+                        }
+                    }
+                    OpenedModelHandle::Loading(task) => {
+                        break task.clone();
+                    }
+                },
+                hash_map::Entry::Vacant(e) => {
+                    let task = cx
+                        .spawn(move |this, mut cx| async move {
+                            let channel = this.update(&mut cx, |this, _| {
+                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
+                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
+                                })
+                            })??;
+
+                            load(channel, cx).await.map_err(Arc::new)
+                        })
+                        .shared();
+
+                    e.insert(OpenedModelHandle::Loading(task.clone()));
+                    cx.spawn({
+                        let task = task.clone();
+                        move |this, mut cx| async move {
+                            let result = task.await;
+                            this.update(&mut cx, |this, _| match result {
+                                Ok(model) => {
+                                    get_map(this).insert(
+                                        channel_id,
+                                        OpenedModelHandle::Open(model.downgrade()),
+                                    );
+                                }
+                                Err(_) => {
+                                    get_map(this).remove(&channel_id);
+                                }
+                            })
+                            .ok();
+                        }
+                    })
+                    .detach();
+                    break task;
+                }
+            }
+        };
+        cx.executor()
+            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
+    }
+
+    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
+        let Some(channel) = self.channel_for_id(channel_id) else {
+            return false;
+        };
+        channel.role == proto::ChannelRole::Admin
+    }
+
+    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
+        self.channel_participants
+            .get(&channel_id)
+            .map_or(&[], |v| v.as_slice())
+    }
+
+    pub fn create_channel(
+        &self,
+        name: &str,
+        parent_id: Option<ChannelId>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<ChannelId>> {
+        let client = self.client.clone();
+        let name = name.trim_start_matches("#").to_owned();
+        cx.spawn(move |this, mut cx| async move {
+            let response = client
+                .request(proto::CreateChannel { name, parent_id })
+                .await?;
+
+            let channel = response
+                .channel
+                .ok_or_else(|| anyhow!("missing channel in response"))?;
+            let channel_id = channel.id;
+
+            this.update(&mut cx, |this, cx| {
+                let task = this.update_channels(
+                    proto::UpdateChannels {
+                        channels: vec![channel],
+                        ..Default::default()
+                    },
+                    cx,
+                );
+                assert!(task.is_none());
+
+                // This event is emitted because the collab panel wants to clear the pending edit state
+                // before this frame is rendered. But we can't guarantee that the collab panel's future
+                // will resolve before this flush_effects finishes. Synchronously emitting this event
+                // ensures that the collab panel will observe this creation before the frame completes
+                cx.emit(ChannelEvent::ChannelCreated(channel_id));
+            })?;
+
+            Ok(channel_id)
+        })
+    }
+
+    pub fn move_channel(
+        &mut self,
+        channel_id: ChannelId,
+        to: Option<ChannelId>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let client = self.client.clone();
+        cx.spawn(move |_, _| async move {
+            let _ = client
+                .request(proto::MoveChannel { channel_id, to })
+                .await?;
+
+            Ok(())
+        })
+    }
+
+    pub fn set_channel_visibility(
+        &mut self,
+        channel_id: ChannelId,
+        visibility: ChannelVisibility,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let client = self.client.clone();
+        cx.spawn(move |_, _| async move {
+            let _ = client
+                .request(proto::SetChannelVisibility {
+                    channel_id,
+                    visibility: visibility.into(),
+                })
+                .await?;
+
+            Ok(())
+        })
+    }
+
+    pub fn invite_member(
+        &mut self,
+        channel_id: ChannelId,
+        user_id: UserId,
+        role: proto::ChannelRole,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        if !self.outgoing_invites.insert((channel_id, user_id)) {
+            return Task::ready(Err(anyhow!("invite request already in progress")));
+        }
+
+        cx.notify();
+        let client = self.client.clone();
+        cx.spawn(move |this, mut cx| async move {
+            let result = client
+                .request(proto::InviteChannelMember {
+                    channel_id,
+                    user_id,
+                    role: role.into(),
+                })
+                .await;
+
+            this.update(&mut cx, |this, cx| {
+                this.outgoing_invites.remove(&(channel_id, user_id));
+                cx.notify();
+            })?;
+
+            result?;
+
+            Ok(())
+        })
+    }
+
+    pub fn remove_member(
+        &mut self,
+        channel_id: ChannelId,
+        user_id: u64,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        if !self.outgoing_invites.insert((channel_id, user_id)) {
+            return Task::ready(Err(anyhow!("invite request already in progress")));
+        }
+
+        cx.notify();
+        let client = self.client.clone();
+        cx.spawn(move |this, mut cx| async move {
+            let result = client
+                .request(proto::RemoveChannelMember {
+                    channel_id,
+                    user_id,
+                })
+                .await;
+
+            this.update(&mut cx, |this, cx| {
+                this.outgoing_invites.remove(&(channel_id, user_id));
+                cx.notify();
+            })?;
+            result?;
+            Ok(())
+        })
+    }
+
+    pub fn set_member_role(
+        &mut self,
+        channel_id: ChannelId,
+        user_id: UserId,
+        role: proto::ChannelRole,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        if !self.outgoing_invites.insert((channel_id, user_id)) {
+            return Task::ready(Err(anyhow!("member request already in progress")));
+        }
+
+        cx.notify();
+        let client = self.client.clone();
+        cx.spawn(move |this, mut cx| async move {
+            let result = client
+                .request(proto::SetChannelMemberRole {
+                    channel_id,
+                    user_id,
+                    role: role.into(),
+                })
+                .await;
+
+            this.update(&mut cx, |this, cx| {
+                this.outgoing_invites.remove(&(channel_id, user_id));
+                cx.notify();
+            })?;
+
+            result?;
+            Ok(())
+        })
+    }
+
+    pub fn rename(
+        &mut self,
+        channel_id: ChannelId,
+        new_name: &str,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let client = self.client.clone();
+        let name = new_name.to_string();
+        cx.spawn(move |this, mut cx| async move {
+            let channel = client
+                .request(proto::RenameChannel { channel_id, name })
+                .await?
+                .channel
+                .ok_or_else(|| anyhow!("missing channel in response"))?;
+            this.update(&mut cx, |this, cx| {
+                let task = this.update_channels(
+                    proto::UpdateChannels {
+                        channels: vec![channel],
+                        ..Default::default()
+                    },
+                    cx,
+                );
+                assert!(task.is_none());
+
+                // This event is emitted because the collab panel wants to clear the pending edit state
+                // before this frame is rendered. But we can't guarantee that the collab panel's future
+                // will resolve before this flush_effects finishes. Synchronously emitting this event
+                // ensures that the collab panel will observe this creation before the frame complete
+                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
+            })?;
+            Ok(())
+        })
+    }
+
+    pub fn respond_to_channel_invite(
+        &mut self,
+        channel_id: ChannelId,
+        accept: bool,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let client = self.client.clone();
+        cx.executor().spawn(async move {
+            client
+                .request(proto::RespondToChannelInvite { channel_id, accept })
+                .await?;
+            Ok(())
+        })
+    }
+
+    pub fn get_channel_member_details(
+        &self,
+        channel_id: ChannelId,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<Vec<ChannelMembership>>> {
+        let client = self.client.clone();
+        let user_store = self.user_store.downgrade();
+        cx.spawn(move |_, mut cx| async move {
+            let response = client
+                .request(proto::GetChannelMembers { channel_id })
+                .await?;
+
+            let user_ids = response.members.iter().map(|m| m.user_id).collect();
+            let user_store = user_store
+                .upgrade()
+                .ok_or_else(|| anyhow!("user store dropped"))?;
+            let users = user_store
+                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
+                .await?;
+
+            Ok(users
+                .into_iter()
+                .zip(response.members)
+                .filter_map(|(user, member)| {
+                    Some(ChannelMembership {
+                        user,
+                        role: member.role(),
+                        kind: member.kind(),
+                    })
+                })
+                .collect())
+        })
+    }
+
+    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
+        let client = self.client.clone();
+        async move {
+            client.request(proto::DeleteChannel { channel_id }).await?;
+            Ok(())
+        }
+    }
+
+    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
+        false
+    }
+
+    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
+        self.outgoing_invites.contains(&(channel_id, user_id))
+    }
+
+    async fn handle_update_channels(
+        this: Model<Self>,
+        message: TypedEnvelope<proto::UpdateChannels>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, _| {
+            this.update_channels_tx
+                .unbounded_send(message.payload)
+                .unwrap();
+        })?;
+        Ok(())
+    }
+
+    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+        self.channel_index.clear();
+        self.channel_invitations.clear();
+        self.channel_participants.clear();
+        self.channel_index.clear();
+        self.outgoing_invites.clear();
+        self.disconnect_channel_buffers_task.take();
+
+        for chat in self.opened_chats.values() {
+            if let OpenedModelHandle::Open(chat) = chat {
+                if let Some(chat) = chat.upgrade() {
+                    chat.update(cx, |chat, cx| {
+                        chat.rejoin(cx);
+                    });
+                }
+            }
+        }
+
+        let mut buffer_versions = Vec::new();
+        for buffer in self.opened_buffers.values() {
+            if let OpenedModelHandle::Open(buffer) = buffer {
+                if let Some(buffer) = buffer.upgrade() {
+                    let channel_buffer = buffer.read(cx);
+                    let buffer = channel_buffer.buffer().read(cx);
+                    buffer_versions.push(proto::ChannelBufferVersion {
+                        channel_id: channel_buffer.channel_id,
+                        epoch: channel_buffer.epoch(),
+                        version: language2::proto::serialize_version(&buffer.version()),
+                    });
+                }
+            }
+        }
+
+        if buffer_versions.is_empty() {
+            return Task::ready(Ok(()));
+        }
+
+        let response = self.client.request(proto::RejoinChannelBuffers {
+            buffers: buffer_versions,
+        });
+
+        cx.spawn(|this, mut cx| async move {
+            let mut response = response.await?;
+
+            this.update(&mut cx, |this, cx| {
+                this.opened_buffers.retain(|_, buffer| match buffer {
+                    OpenedModelHandle::Open(channel_buffer) => {
+                        let Some(channel_buffer) = channel_buffer.upgrade() else {
+                            return false;
+                        };
+
+                        channel_buffer.update(cx, |channel_buffer, cx| {
+                            let channel_id = channel_buffer.channel_id;
+                            if let Some(remote_buffer) = response
+                                .buffers
+                                .iter_mut()
+                                .find(|buffer| buffer.channel_id == channel_id)
+                            {
+                                let channel_id = channel_buffer.channel_id;
+                                let remote_version =
+                                    language2::proto::deserialize_version(&remote_buffer.version);
+
+                                channel_buffer.replace_collaborators(
+                                    mem::take(&mut remote_buffer.collaborators),
+                                    cx,
+                                );
+
+                                let operations = channel_buffer
+                                    .buffer()
+                                    .update(cx, |buffer, cx| {
+                                        let outgoing_operations =
+                                            buffer.serialize_ops(Some(remote_version), cx);
+                                        let incoming_operations =
+                                            mem::take(&mut remote_buffer.operations)
+                                                .into_iter()
+                                                .map(language2::proto::deserialize_operation)
+                                                .collect::<Result<Vec<_>>>()?;
+                                        buffer.apply_ops(incoming_operations, cx)?;
+                                        anyhow::Ok(outgoing_operations)
+                                    })
+                                    .log_err();
+
+                                if let Some(operations) = operations {
+                                    let client = this.client.clone();
+                                    cx.executor()
+                                        .spawn(async move {
+                                            let operations = operations.await;
+                                            for chunk in
+                                                language2::proto::split_operations(operations)
+                                            {
+                                                client
+                                                    .send(proto::UpdateChannelBuffer {
+                                                        channel_id,
+                                                        operations: chunk,
+                                                    })
+                                                    .ok();
+                                            }
+                                        })
+                                        .detach();
+                                    return true;
+                                }
+                            }
+
+                            channel_buffer.disconnect(cx);
+                            false
+                        })
+                    }
+                    OpenedModelHandle::Loading(_) => true,
+                });
+            })
+            .ok();
+            anyhow::Ok(())
+        })
+    }
+
+    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
+        cx.notify();
+
+        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
+            cx.spawn(move |this, mut cx| async move {
+                if wait_for_reconnect {
+                    cx.executor().timer(RECONNECT_TIMEOUT).await;
+                }
+
+                if let Some(this) = this.upgrade() {
+                    this.update(&mut cx, |this, cx| {
+                        for (_, buffer) in this.opened_buffers.drain() {
+                            if let OpenedModelHandle::Open(buffer) = buffer {
+                                if let Some(buffer) = buffer.upgrade() {
+                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
+                                }
+                            }
+                        }
+                    })
+                    .ok();
+                }
+            })
+        });
+    }
+
+    pub(crate) fn update_channels(
+        &mut self,
+        payload: proto::UpdateChannels,
+        cx: &mut ModelContext<ChannelStore>,
+    ) -> Option<Task<Result<()>>> {
+        if !payload.remove_channel_invitations.is_empty() {
+            self.channel_invitations
+                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
+        }
+        for channel in payload.channel_invitations {
+            match self
+                .channel_invitations
+                .binary_search_by_key(&channel.id, |c| c.id)
+            {
+                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
+                Err(ix) => self.channel_invitations.insert(
+                    ix,
+                    Arc::new(Channel {
+                        id: channel.id,
+                        visibility: channel.visibility(),
+                        role: channel.role(),
+                        name: channel.name,
+                        unseen_note_version: None,
+                        unseen_message_id: None,
+                        parent_path: channel.parent_path,
+                    }),
+                ),
+            }
+        }
+
+        let channels_changed = !payload.channels.is_empty()
+            || !payload.delete_channels.is_empty()
+            || !payload.unseen_channel_messages.is_empty()
+            || !payload.unseen_channel_buffer_changes.is_empty();
+
+        if channels_changed {
+            if !payload.delete_channels.is_empty() {
+                self.channel_index.delete_channels(&payload.delete_channels);
+                self.channel_participants
+                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
+
+                for channel_id in &payload.delete_channels {
+                    let channel_id = *channel_id;
+                    if payload
+                        .channels
+                        .iter()
+                        .any(|channel| channel.id == channel_id)
+                    {
+                        continue;
+                    }
+                    if let Some(OpenedModelHandle::Open(buffer)) =
+                        self.opened_buffers.remove(&channel_id)
+                    {
+                        if let Some(buffer) = buffer.upgrade() {
+                            buffer.update(cx, ChannelBuffer::disconnect);
+                        }
+                    }
+                }
+            }
+
+            let mut index = self.channel_index.bulk_insert();
+            for channel in payload.channels {
+                let id = channel.id;
+                let channel_changed = index.insert(channel);
+
+                if channel_changed {
+                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
+                        if let Some(buffer) = buffer.upgrade() {
+                            buffer.update(cx, ChannelBuffer::channel_changed);
+                        }
+                    }
+                }
+            }
+
+            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
+                let version = language2::proto::deserialize_version(&unseen_buffer_change.version);
+                index.note_changed(
+                    unseen_buffer_change.channel_id,
+                    unseen_buffer_change.epoch,
+                    &version,
+                );
+            }
+
+            for unseen_channel_message in payload.unseen_channel_messages {
+                index.new_messages(
+                    unseen_channel_message.channel_id,
+                    unseen_channel_message.message_id,
+                );
+            }
+        }
+
+        cx.notify();
+        if payload.channel_participants.is_empty() {
+            return None;
+        }
+
+        let mut all_user_ids = Vec::new();
+        let channel_participants = payload.channel_participants;
+        for entry in &channel_participants {
+            for user_id in entry.participant_user_ids.iter() {
+                if let Err(ix) = all_user_ids.binary_search(user_id) {
+                    all_user_ids.insert(ix, *user_id);
+                }
+            }
+        }
+
+        let users = self
+            .user_store
+            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
+        Some(cx.spawn(|this, mut cx| async move {
+            let users = users.await?;
+
+            this.update(&mut cx, |this, cx| {
+                for entry in &channel_participants {
+                    let mut participants: Vec<_> = entry
+                        .participant_user_ids
+                        .iter()
+                        .filter_map(|user_id| {
+                            users
+                                .binary_search_by_key(&user_id, |user| &user.id)
+                                .ok()
+                                .map(|ix| users[ix].clone())
+                        })
+                        .collect();
+
+                    participants.sort_by_key(|u| u.id);
+
+                    this.channel_participants
+                        .insert(entry.channel_id, participants);
+                }
+
+                cx.notify();
+            })
+        }))
+    }
+}

crates/channel2/src/channel_store/channel_index.rs 🔗

@@ -0,0 +1,184 @@
+use crate::{Channel, ChannelId};
+use collections::BTreeMap;
+use rpc2::proto;
+use std::sync::Arc;
+
+#[derive(Default, Debug)]
+pub struct ChannelIndex {
+    channels_ordered: Vec<ChannelId>,
+    channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
+}
+
+impl ChannelIndex {
+    pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
+        &self.channels_by_id
+    }
+
+    pub fn ordered_channels(&self) -> &[ChannelId] {
+        &self.channels_ordered
+    }
+
+    pub fn clear(&mut self) {
+        self.channels_ordered.clear();
+        self.channels_by_id.clear();
+    }
+
+    /// Delete the given channels from this index.
+    pub fn delete_channels(&mut self, channels: &[ChannelId]) {
+        self.channels_by_id
+            .retain(|channel_id, _| !channels.contains(channel_id));
+        self.channels_ordered
+            .retain(|channel_id| !channels.contains(channel_id));
+    }
+
+    pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
+        ChannelPathsInsertGuard {
+            channels_ordered: &mut self.channels_ordered,
+            channels_by_id: &mut self.channels_by_id,
+        }
+    }
+
+    pub fn acknowledge_note_version(
+        &mut self,
+        channel_id: ChannelId,
+        epoch: u64,
+        version: &clock::Global,
+    ) {
+        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
+            let channel = Arc::make_mut(channel);
+            if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version {
+                if epoch > *unseen_epoch
+                    || epoch == *unseen_epoch && version.observed_all(unseen_version)
+                {
+                    channel.unseen_note_version = None;
+                }
+            }
+        }
+    }
+
+    pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) {
+        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
+            let channel = Arc::make_mut(channel);
+            if let Some(unseen_message_id) = channel.unseen_message_id {
+                if message_id >= unseen_message_id {
+                    channel.unseen_message_id = None;
+                }
+            }
+        }
+    }
+
+    pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
+        insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version);
+    }
+
+    pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) {
+        insert_new_message(&mut self.channels_by_id, channel_id, message_id)
+    }
+}
+
+/// A guard for ensuring that the paths index maintains its sort and uniqueness
+/// invariants after a series of insertions
+#[derive(Debug)]
+pub struct ChannelPathsInsertGuard<'a> {
+    channels_ordered: &'a mut Vec<ChannelId>,
+    channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
+}
+
+impl<'a> ChannelPathsInsertGuard<'a> {
+    pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
+        insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version);
+    }
+
+    pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) {
+        insert_new_message(&mut self.channels_by_id, channel_id, message_id)
+    }
+
+    pub fn insert(&mut self, channel_proto: proto::Channel) -> bool {
+        let mut ret = false;
+        if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
+            let existing_channel = Arc::make_mut(existing_channel);
+
+            ret = existing_channel.visibility != channel_proto.visibility()
+                || existing_channel.role != channel_proto.role()
+                || existing_channel.name != channel_proto.name;
+
+            existing_channel.visibility = channel_proto.visibility();
+            existing_channel.role = channel_proto.role();
+            existing_channel.name = channel_proto.name;
+        } else {
+            self.channels_by_id.insert(
+                channel_proto.id,
+                Arc::new(Channel {
+                    id: channel_proto.id,
+                    visibility: channel_proto.visibility(),
+                    role: channel_proto.role(),
+                    name: channel_proto.name,
+                    unseen_note_version: None,
+                    unseen_message_id: None,
+                    parent_path: channel_proto.parent_path,
+                }),
+            );
+            self.insert_root(channel_proto.id);
+        }
+        ret
+    }
+
+    fn insert_root(&mut self, channel_id: ChannelId) {
+        self.channels_ordered.push(channel_id);
+    }
+}
+
+impl<'a> Drop for ChannelPathsInsertGuard<'a> {
+    fn drop(&mut self) {
+        self.channels_ordered.sort_by(|a, b| {
+            let a = channel_path_sorting_key(*a, &self.channels_by_id);
+            let b = channel_path_sorting_key(*b, &self.channels_by_id);
+            a.cmp(b)
+        });
+        self.channels_ordered.dedup();
+    }
+}
+
+fn channel_path_sorting_key<'a>(
+    id: ChannelId,
+    channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
+) -> impl Iterator<Item = &str> {
+    let (parent_path, name) = channels_by_id
+        .get(&id)
+        .map_or((&[] as &[_], None), |channel| {
+            (channel.parent_path.as_slice(), Some(channel.name.as_str()))
+        });
+    parent_path
+        .iter()
+        .filter_map(|id| Some(channels_by_id.get(id)?.name.as_str()))
+        .chain(name)
+}
+
+fn insert_note_changed(
+    channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
+    channel_id: u64,
+    epoch: u64,
+    version: &clock::Global,
+) {
+    if let Some(channel) = channels_by_id.get_mut(&channel_id) {
+        let unseen_version = Arc::make_mut(channel)
+            .unseen_note_version
+            .get_or_insert((0, clock::Global::new()));
+        if epoch > unseen_version.0 {
+            *unseen_version = (epoch, version.clone());
+        } else {
+            unseen_version.1.join(&version);
+        }
+    }
+}
+
+fn insert_new_message(
+    channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
+    channel_id: u64,
+    message_id: u64,
+) {
+    if let Some(channel) = channels_by_id.get_mut(&channel_id) {
+        let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0);
+        *unseen_message_id = message_id.max(*unseen_message_id);
+    }
+}

crates/channel2/src/channel_store_tests.rs 🔗

@@ -0,0 +1,380 @@
+use crate::channel_chat::ChannelChatEvent;
+
+use super::*;
+use client2::{test::FakeServer, Client, UserStore};
+use gpui2::{AppContext, Context, Model, TestAppContext};
+use rpc2::proto::{self};
+use settings2::SettingsStore;
+use util::http::FakeHttpClient;
+
+#[gpui2::test]
+fn test_update_channels(cx: &mut AppContext) {
+    let channel_store = init_test(cx);
+
+    update_channels(
+        &channel_store,
+        proto::UpdateChannels {
+            channels: vec![
+                proto::Channel {
+                    id: 1,
+                    name: "b".to_string(),
+                    visibility: proto::ChannelVisibility::Members as i32,
+                    role: proto::ChannelRole::Admin.into(),
+                    parent_path: Vec::new(),
+                },
+                proto::Channel {
+                    id: 2,
+                    name: "a".to_string(),
+                    visibility: proto::ChannelVisibility::Members as i32,
+                    role: proto::ChannelRole::Member.into(),
+                    parent_path: Vec::new(),
+                },
+            ],
+            ..Default::default()
+        },
+        cx,
+    );
+    assert_channels(
+        &channel_store,
+        &[
+            //
+            (0, "a".to_string(), proto::ChannelRole::Member),
+            (0, "b".to_string(), proto::ChannelRole::Admin),
+        ],
+        cx,
+    );
+
+    update_channels(
+        &channel_store,
+        proto::UpdateChannels {
+            channels: vec![
+                proto::Channel {
+                    id: 3,
+                    name: "x".to_string(),
+                    visibility: proto::ChannelVisibility::Members as i32,
+                    role: proto::ChannelRole::Admin.into(),
+                    parent_path: vec![1],
+                },
+                proto::Channel {
+                    id: 4,
+                    name: "y".to_string(),
+                    visibility: proto::ChannelVisibility::Members as i32,
+                    role: proto::ChannelRole::Member.into(),
+                    parent_path: vec![2],
+                },
+            ],
+            ..Default::default()
+        },
+        cx,
+    );
+    assert_channels(
+        &channel_store,
+        &[
+            (0, "a".to_string(), proto::ChannelRole::Member),
+            (1, "y".to_string(), proto::ChannelRole::Member),
+            (0, "b".to_string(), proto::ChannelRole::Admin),
+            (1, "x".to_string(), proto::ChannelRole::Admin),
+        ],
+        cx,
+    );
+}
+
+#[gpui2::test]
+fn test_dangling_channel_paths(cx: &mut AppContext) {
+    let channel_store = init_test(cx);
+
+    update_channels(
+        &channel_store,
+        proto::UpdateChannels {
+            channels: vec![
+                proto::Channel {
+                    id: 0,
+                    name: "a".to_string(),
+                    visibility: proto::ChannelVisibility::Members as i32,
+                    role: proto::ChannelRole::Admin.into(),
+                    parent_path: vec![],
+                },
+                proto::Channel {
+                    id: 1,
+                    name: "b".to_string(),
+                    visibility: proto::ChannelVisibility::Members as i32,
+                    role: proto::ChannelRole::Admin.into(),
+                    parent_path: vec![0],
+                },
+                proto::Channel {
+                    id: 2,
+                    name: "c".to_string(),
+                    visibility: proto::ChannelVisibility::Members as i32,
+                    role: proto::ChannelRole::Admin.into(),
+                    parent_path: vec![0, 1],
+                },
+            ],
+            ..Default::default()
+        },
+        cx,
+    );
+    // Sanity check
+    assert_channels(
+        &channel_store,
+        &[
+            //
+            (0, "a".to_string(), proto::ChannelRole::Admin),
+            (1, "b".to_string(), proto::ChannelRole::Admin),
+            (2, "c".to_string(), proto::ChannelRole::Admin),
+        ],
+        cx,
+    );
+
+    update_channels(
+        &channel_store,
+        proto::UpdateChannels {
+            delete_channels: vec![1, 2],
+            ..Default::default()
+        },
+        cx,
+    );
+
+    // Make sure that the 1/2/3 path is gone
+    assert_channels(
+        &channel_store,
+        &[(0, "a".to_string(), proto::ChannelRole::Admin)],
+        cx,
+    );
+}
+
+#[gpui2::test]
+async fn test_channel_messages(cx: &mut TestAppContext) {
+    let user_id = 5;
+    let channel_id = 5;
+    let channel_store = cx.update(init_test);
+    let client = channel_store.update(cx, |s, _| s.client());
+    let server = FakeServer::for_client(user_id, &client, cx).await;
+
+    // Get the available channels.
+    server.send(proto::UpdateChannels {
+        channels: vec![proto::Channel {
+            id: channel_id,
+            name: "the-channel".to_string(),
+            visibility: proto::ChannelVisibility::Members as i32,
+            role: proto::ChannelRole::Member.into(),
+            parent_path: vec![],
+        }],
+        ..Default::default()
+    });
+    cx.executor().run_until_parked();
+    cx.update(|cx| {
+        assert_channels(
+            &channel_store,
+            &[(0, "the-channel".to_string(), proto::ChannelRole::Member)],
+            cx,
+        );
+    });
+
+    let get_users = server.receive::<proto::GetUsers>().await.unwrap();
+    assert_eq!(get_users.payload.user_ids, vec![5]);
+    server.respond(
+        get_users.receipt(),
+        proto::UsersResponse {
+            users: vec![proto::User {
+                id: 5,
+                github_login: "nathansobo".into(),
+                avatar_url: "http://avatar.com/nathansobo".into(),
+            }],
+        },
+    );
+
+    // Join a channel and populate its existing messages.
+    let channel = channel_store.update(cx, |store, cx| {
+        let channel_id = store.ordered_channels().next().unwrap().1.id;
+        store.open_channel_chat(channel_id, cx)
+    });
+    let join_channel = server.receive::<proto::JoinChannelChat>().await.unwrap();
+    server.respond(
+        join_channel.receipt(),
+        proto::JoinChannelChatResponse {
+            messages: vec![
+                proto::ChannelMessage {
+                    id: 10,
+                    body: "a".into(),
+                    timestamp: 1000,
+                    sender_id: 5,
+                    mentions: vec![],
+                    nonce: Some(1.into()),
+                },
+                proto::ChannelMessage {
+                    id: 11,
+                    body: "b".into(),
+                    timestamp: 1001,
+                    sender_id: 6,
+                    mentions: vec![],
+                    nonce: Some(2.into()),
+                },
+            ],
+            done: false,
+        },
+    );
+
+    cx.executor().start_waiting();
+
+    // Client requests all users for the received messages
+    let mut get_users = server.receive::<proto::GetUsers>().await.unwrap();
+    get_users.payload.user_ids.sort();
+    assert_eq!(get_users.payload.user_ids, vec![6]);
+    server.respond(
+        get_users.receipt(),
+        proto::UsersResponse {
+            users: vec![proto::User {
+                id: 6,
+                github_login: "maxbrunsfeld".into(),
+                avatar_url: "http://avatar.com/maxbrunsfeld".into(),
+            }],
+        },
+    );
+
+    let channel = channel.await.unwrap();
+    channel.update(cx, |channel, _| {
+        assert_eq!(
+            channel
+                .messages_in_range(0..2)
+                .map(|message| (message.sender.github_login.clone(), message.body.clone()))
+                .collect::<Vec<_>>(),
+            &[
+                ("nathansobo".into(), "a".into()),
+                ("maxbrunsfeld".into(), "b".into())
+            ]
+        );
+    });
+
+    // Receive a new message.
+    server.send(proto::ChannelMessageSent {
+        channel_id,
+        message: Some(proto::ChannelMessage {
+            id: 12,
+            body: "c".into(),
+            timestamp: 1002,
+            sender_id: 7,
+            mentions: vec![],
+            nonce: Some(3.into()),
+        }),
+    });
+
+    // Client requests user for message since they haven't seen them yet
+    let get_users = server.receive::<proto::GetUsers>().await.unwrap();
+    assert_eq!(get_users.payload.user_ids, vec![7]);
+    server.respond(
+        get_users.receipt(),
+        proto::UsersResponse {
+            users: vec![proto::User {
+                id: 7,
+                github_login: "as-cii".into(),
+                avatar_url: "http://avatar.com/as-cii".into(),
+            }],
+        },
+    );
+
+    assert_eq!(
+        channel.next_event(cx),
+        ChannelChatEvent::MessagesUpdated {
+            old_range: 2..2,
+            new_count: 1,
+        }
+    );
+    channel.update(cx, |channel, _| {
+        assert_eq!(
+            channel
+                .messages_in_range(2..3)
+                .map(|message| (message.sender.github_login.clone(), message.body.clone()))
+                .collect::<Vec<_>>(),
+            &[("as-cii".into(), "c".into())]
+        )
+    });
+
+    // Scroll up to view older messages.
+    channel.update(cx, |channel, cx| {
+        channel.load_more_messages(cx).unwrap().detach();
+    });
+    let get_messages = server.receive::<proto::GetChannelMessages>().await.unwrap();
+    assert_eq!(get_messages.payload.channel_id, 5);
+    assert_eq!(get_messages.payload.before_message_id, 10);
+    server.respond(
+        get_messages.receipt(),
+        proto::GetChannelMessagesResponse {
+            done: true,
+            messages: vec![
+                proto::ChannelMessage {
+                    id: 8,
+                    body: "y".into(),
+                    timestamp: 998,
+                    sender_id: 5,
+                    nonce: Some(4.into()),
+                    mentions: vec![],
+                },
+                proto::ChannelMessage {
+                    id: 9,
+                    body: "z".into(),
+                    timestamp: 999,
+                    sender_id: 6,
+                    nonce: Some(5.into()),
+                    mentions: vec![],
+                },
+            ],
+        },
+    );
+
+    assert_eq!(
+        channel.next_event(cx),
+        ChannelChatEvent::MessagesUpdated {
+            old_range: 0..0,
+            new_count: 2,
+        }
+    );
+    channel.update(cx, |channel, _| {
+        assert_eq!(
+            channel
+                .messages_in_range(0..2)
+                .map(|message| (message.sender.github_login.clone(), message.body.clone()))
+                .collect::<Vec<_>>(),
+            &[
+                ("nathansobo".into(), "y".into()),
+                ("maxbrunsfeld".into(), "z".into())
+            ]
+        );
+    });
+}
+
+fn init_test(cx: &mut AppContext) -> Model<ChannelStore> {
+    let http = FakeHttpClient::with_404_response();
+    let client = Client::new(http.clone(), cx);
+    let user_store = cx.build_model(|cx| UserStore::new(client.clone(), http, cx));
+
+    let settings_store = SettingsStore::test(cx);
+    cx.set_global(settings_store);
+    client2::init(&client, cx);
+    crate::init(&client, user_store, cx);
+
+    ChannelStore::global(cx)
+}
+
+fn update_channels(
+    channel_store: &Model<ChannelStore>,
+    message: proto::UpdateChannels,
+    cx: &mut AppContext,
+) {
+    let task = channel_store.update(cx, |store, cx| store.update_channels(message, cx));
+    assert!(task.is_none());
+}
+
+#[track_caller]
+fn assert_channels(
+    channel_store: &Model<ChannelStore>,
+    expected_channels: &[(usize, String, proto::ChannelRole)],
+    cx: &mut AppContext,
+) {
+    let actual = channel_store.update(cx, |store, _| {
+        store
+            .ordered_channels()
+            .map(|(depth, channel)| (depth, channel.name.to_string(), channel.role))
+            .collect::<Vec<_>>()
+    });
+    assert_eq!(actual, expected_channels);
+}

crates/client2/src/user.rs 🔗

@@ -292,22 +292,18 @@ impl UserStore {
                         .upgrade()
                         .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
                     for contact in message.contacts {
-                        let should_notify = contact.should_notify;
-                        updated_contacts.push((
-                            Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
-                            should_notify,
+                        updated_contacts.push(Arc::new(
+                            Contact::from_proto(contact, &this, &mut cx).await?,
                         ));
                     }
 
                     let mut incoming_requests = Vec::new();
                     for request in message.incoming_requests {
                         incoming_requests.push({
-                            let user = this
-                                .update(&mut cx, |this, cx| {
-                                    this.get_user(request.requester_id, cx)
-                                })?
-                                .await?;
-                            (user, request.should_notify)
+                            this.update(&mut cx, |this, cx| {
+                                this.get_user(request.requester_id, cx)
+                            })?
+                            .await?
                         });
                     }
 
@@ -331,13 +327,7 @@ impl UserStore {
                         this.contacts
                             .retain(|contact| !removed_contacts.contains(&contact.user.id));
                         // Update existing contacts and insert new ones
-                        for (updated_contact, should_notify) in updated_contacts {
-                            if should_notify {
-                                cx.emit(Event::Contact {
-                                    user: updated_contact.user.clone(),
-                                    kind: ContactEventKind::Accepted,
-                                });
-                            }
+                        for updated_contact in updated_contacts {
                             match this.contacts.binary_search_by_key(
                                 &&updated_contact.user.github_login,
                                 |contact| &contact.user.github_login,
@@ -360,14 +350,7 @@ impl UserStore {
                             }
                         });
                         // Update existing incoming requests and insert new ones
-                        for (user, should_notify) in incoming_requests {
-                            if should_notify {
-                                cx.emit(Event::Contact {
-                                    user: user.clone(),
-                                    kind: ContactEventKind::Requested,
-                                });
-                            }
-
+                        for user in incoming_requests {
                             match this
                                 .incoming_contact_requests
                                 .binary_search_by_key(&&user.github_login, |contact| {

crates/gpui2/src/app/test_context.rs 🔗

@@ -189,3 +189,22 @@ impl TestAppContext {
         .unwrap();
     }
 }
+
+impl<T: Send + EventEmitter> Model<T> {
+    pub fn next_event(&self, cx: &mut TestAppContext) -> T::Event
+    where
+        T::Event: Send + Clone,
+    {
+        let (tx, mut rx) = futures::channel::mpsc::unbounded();
+        let _subscription = self.update(cx, |_, cx| {
+            cx.subscribe(self, move |_, _, event, _| {
+                tx.unbounded_send(event.clone()).ok();
+            })
+        });
+
+        cx.executor().run_until_parked();
+        rx.try_next()
+            .expect("no event received")
+            .expect("model was dropped")
+    }
+}

crates/rpc2/proto/zed.proto 🔗

@@ -89,88 +89,96 @@ message Envelope {
         FormatBuffersResponse format_buffers_response = 70;
         GetCompletions get_completions = 71;
         GetCompletionsResponse get_completions_response = 72;
-        ApplyCompletionAdditionalEdits apply_completion_additional_edits = 73;
-        ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 74;
-        GetCodeActions get_code_actions = 75;
-        GetCodeActionsResponse get_code_actions_response = 76;
-        GetHover get_hover = 77;
-        GetHoverResponse get_hover_response = 78;
-        ApplyCodeAction apply_code_action = 79;
-        ApplyCodeActionResponse apply_code_action_response = 80;
-        PrepareRename prepare_rename = 81;
-        PrepareRenameResponse prepare_rename_response = 82;
-        PerformRename perform_rename = 83;
-        PerformRenameResponse perform_rename_response = 84;
-        SearchProject search_project = 85;
-        SearchProjectResponse search_project_response = 86;
-
-        UpdateContacts update_contacts = 87;
-        UpdateInviteInfo update_invite_info = 88;
-        ShowContacts show_contacts = 89;
-
-        GetUsers get_users = 90;
-        FuzzySearchUsers fuzzy_search_users = 91;
-        UsersResponse users_response = 92;
-        RequestContact request_contact = 93;
-        RespondToContactRequest respond_to_contact_request = 94;
-        RemoveContact remove_contact = 95;
-
-        Follow follow = 96;
-        FollowResponse follow_response = 97;
-        UpdateFollowers update_followers = 98;
-        Unfollow unfollow = 99;
-        GetPrivateUserInfo get_private_user_info = 100;
-        GetPrivateUserInfoResponse get_private_user_info_response = 101;
-        UpdateDiffBase update_diff_base = 102;
-
-        OnTypeFormatting on_type_formatting = 103;
-        OnTypeFormattingResponse on_type_formatting_response = 104;
-
-        UpdateWorktreeSettings update_worktree_settings = 105;
-
-        InlayHints inlay_hints = 106;
-        InlayHintsResponse inlay_hints_response = 107;
-        ResolveInlayHint resolve_inlay_hint = 108;
-        ResolveInlayHintResponse resolve_inlay_hint_response = 109;
-        RefreshInlayHints refresh_inlay_hints = 110;
-
-        CreateChannel create_channel = 111;
-        CreateChannelResponse create_channel_response = 112;
-        InviteChannelMember invite_channel_member = 113;
-        RemoveChannelMember remove_channel_member = 114;
-        RespondToChannelInvite respond_to_channel_invite = 115;
-        UpdateChannels update_channels = 116;
-        JoinChannel join_channel = 117;
-        DeleteChannel delete_channel = 118;
-        GetChannelMembers get_channel_members = 119;
-        GetChannelMembersResponse get_channel_members_response = 120;
-        SetChannelMemberAdmin set_channel_member_admin = 121;
-        RenameChannel rename_channel = 122;
-        RenameChannelResponse rename_channel_response = 123;
-
-        JoinChannelBuffer join_channel_buffer = 124;
-        JoinChannelBufferResponse join_channel_buffer_response = 125;
-        UpdateChannelBuffer update_channel_buffer = 126;
-        LeaveChannelBuffer leave_channel_buffer = 127;
-        UpdateChannelBufferCollaborators update_channel_buffer_collaborators = 128;
-        RejoinChannelBuffers rejoin_channel_buffers = 129;
-        RejoinChannelBuffersResponse rejoin_channel_buffers_response = 130;
-        AckBufferOperation ack_buffer_operation = 143;
-
-        JoinChannelChat join_channel_chat = 131;
-        JoinChannelChatResponse join_channel_chat_response = 132;
-        LeaveChannelChat leave_channel_chat = 133;
-        SendChannelMessage send_channel_message = 134;
-        SendChannelMessageResponse send_channel_message_response = 135;
-        ChannelMessageSent channel_message_sent = 136;
-        GetChannelMessages get_channel_messages = 137;
-        GetChannelMessagesResponse get_channel_messages_response = 138;
-        RemoveChannelMessage remove_channel_message = 139;
-        AckChannelMessage ack_channel_message = 144;
-
-        LinkChannel link_channel = 140;
-        UnlinkChannel unlink_channel = 141;
-        MoveChannel move_channel = 142; // current max: 144
+        ResolveCompletionDocumentation resolve_completion_documentation = 73;
+        ResolveCompletionDocumentationResponse resolve_completion_documentation_response = 74;
+        ApplyCompletionAdditionalEdits apply_completion_additional_edits = 75;
+        ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 76;
+        GetCodeActions get_code_actions = 77;
+        GetCodeActionsResponse get_code_actions_response = 78;
+        GetHover get_hover = 79;
+        GetHoverResponse get_hover_response = 80;
+        ApplyCodeAction apply_code_action = 81;
+        ApplyCodeActionResponse apply_code_action_response = 82;
+        PrepareRename prepare_rename = 83;
+        PrepareRenameResponse prepare_rename_response = 84;
+        PerformRename perform_rename = 85;
+        PerformRenameResponse perform_rename_response = 86;
+        SearchProject search_project = 87;
+        SearchProjectResponse search_project_response = 88;
+
+        UpdateContacts update_contacts = 89;
+        UpdateInviteInfo update_invite_info = 90;
+        ShowContacts show_contacts = 91;
+
+        GetUsers get_users = 92;
+        FuzzySearchUsers fuzzy_search_users = 93;
+        UsersResponse users_response = 94;
+        RequestContact request_contact = 95;
+        RespondToContactRequest respond_to_contact_request = 96;
+        RemoveContact remove_contact = 97;
+
+        Follow follow = 98;
+        FollowResponse follow_response = 99;
+        UpdateFollowers update_followers = 100;
+        Unfollow unfollow = 101;
+        GetPrivateUserInfo get_private_user_info = 102;
+        GetPrivateUserInfoResponse get_private_user_info_response = 103;
+        UpdateDiffBase update_diff_base = 104;
+
+        OnTypeFormatting on_type_formatting = 105;
+        OnTypeFormattingResponse on_type_formatting_response = 106;
+
+        UpdateWorktreeSettings update_worktree_settings = 107;
+
+        InlayHints inlay_hints = 108;
+        InlayHintsResponse inlay_hints_response = 109;
+        ResolveInlayHint resolve_inlay_hint = 110;
+        ResolveInlayHintResponse resolve_inlay_hint_response = 111;
+        RefreshInlayHints refresh_inlay_hints = 112;
+
+        CreateChannel create_channel = 113;
+        CreateChannelResponse create_channel_response = 114;
+        InviteChannelMember invite_channel_member = 115;
+        RemoveChannelMember remove_channel_member = 116;
+        RespondToChannelInvite respond_to_channel_invite = 117;
+        UpdateChannels update_channels = 118;
+        JoinChannel join_channel = 119;
+        DeleteChannel delete_channel = 120;
+        GetChannelMembers get_channel_members = 121;
+        GetChannelMembersResponse get_channel_members_response = 122;
+        SetChannelMemberRole set_channel_member_role = 123;
+        RenameChannel rename_channel = 124;
+        RenameChannelResponse rename_channel_response = 125;
+
+        JoinChannelBuffer join_channel_buffer = 126;
+        JoinChannelBufferResponse join_channel_buffer_response = 127;
+        UpdateChannelBuffer update_channel_buffer = 128;
+        LeaveChannelBuffer leave_channel_buffer = 129;
+        UpdateChannelBufferCollaborators update_channel_buffer_collaborators = 130;
+        RejoinChannelBuffers rejoin_channel_buffers = 131;
+        RejoinChannelBuffersResponse rejoin_channel_buffers_response = 132;
+        AckBufferOperation ack_buffer_operation = 133;
+
+        JoinChannelChat join_channel_chat = 134;
+        JoinChannelChatResponse join_channel_chat_response = 135;
+        LeaveChannelChat leave_channel_chat = 136;
+        SendChannelMessage send_channel_message = 137;
+        SendChannelMessageResponse send_channel_message_response = 138;
+        ChannelMessageSent channel_message_sent = 139;
+        GetChannelMessages get_channel_messages = 140;
+        GetChannelMessagesResponse get_channel_messages_response = 141;
+        RemoveChannelMessage remove_channel_message = 142;
+        AckChannelMessage ack_channel_message = 143;
+        GetChannelMessagesById get_channel_messages_by_id = 144;
+
+        MoveChannel move_channel = 147;
+        SetChannelVisibility set_channel_visibility = 148;
+
+        AddNotification add_notification = 149;
+        GetNotifications get_notifications = 150;
+        GetNotificationsResponse get_notifications_response = 151;
+        DeleteNotification delete_notification = 152;
+        MarkNotificationRead mark_notification_read = 153; // Current max
     }
 }
 
@@ -332,6 +340,7 @@ message RoomUpdated {
 message LiveKitConnectionInfo {
     string server_url = 1;
     string token = 2;
+    bool can_publish = 3;
 }
 
 message ShareProject {
@@ -832,6 +841,17 @@ message ResolveState {
     }
 }
 
+message ResolveCompletionDocumentation {
+    uint64 project_id = 1;
+    uint64 language_server_id = 2;
+    bytes lsp_completion = 3;
+}
+
+message ResolveCompletionDocumentationResponse {
+    string text = 1;
+    bool is_markdown = 2;
+}
+
 message ResolveInlayHint {
     uint64 project_id = 1;
     uint64 buffer_id = 2;
@@ -950,13 +970,10 @@ message LspDiskBasedDiagnosticsUpdated {}
 
 message UpdateChannels {
     repeated Channel channels = 1;
-    repeated ChannelEdge insert_edge = 2;
-    repeated ChannelEdge delete_edge = 3;
     repeated uint64 delete_channels = 4;
     repeated Channel channel_invitations = 5;
     repeated uint64 remove_channel_invitations = 6;
     repeated ChannelParticipants channel_participants = 7;
-    repeated ChannelPermission channel_permissions = 8;
     repeated UnseenChannelMessage unseen_channel_messages = 9;
     repeated UnseenChannelBufferChange unseen_channel_buffer_changes = 10;
 }
@@ -972,14 +989,9 @@ message UnseenChannelBufferChange {
     repeated VectorClockEntry version = 3;
 }
 
-message ChannelEdge {
-    uint64 channel_id = 1;
-    uint64 parent_id = 2;
-}
-
 message ChannelPermission {
     uint64 channel_id = 1;
-    bool is_admin = 2;
+    ChannelRole role = 3;
 }
 
 message ChannelParticipants {
@@ -1005,8 +1017,8 @@ message GetChannelMembersResponse {
 
 message ChannelMember {
     uint64 user_id = 1;
-    bool admin = 2;
     Kind kind = 3;
+    ChannelRole role = 4;
 
     enum Kind {
         Member = 0;
@@ -1028,7 +1040,7 @@ message CreateChannelResponse {
 message InviteChannelMember {
     uint64 channel_id = 1;
     uint64 user_id = 2;
-    bool admin = 3;
+    ChannelRole role = 4;
 }
 
 message RemoveChannelMember {
@@ -1036,10 +1048,22 @@ message RemoveChannelMember {
     uint64 user_id = 2;
 }
 
-message SetChannelMemberAdmin {
+enum ChannelRole {
+    Admin = 0;
+    Member = 1;
+    Guest = 2;
+    Banned = 3;
+}
+
+message SetChannelMemberRole {
     uint64 channel_id = 1;
     uint64 user_id = 2;
-    bool admin = 3;
+    ChannelRole role = 3;
+}
+
+message SetChannelVisibility {
+    uint64 channel_id = 1;
+    ChannelVisibility visibility = 2;
 }
 
 message RenameChannel {
@@ -1068,6 +1092,7 @@ message SendChannelMessage {
     uint64 channel_id = 1;
     string body = 2;
     Nonce nonce = 3;
+    repeated ChatMention mentions = 4;
 }
 
 message RemoveChannelMessage {
@@ -1099,20 +1124,13 @@ message GetChannelMessagesResponse {
     bool done = 2;
 }
 
-message LinkChannel {
-    uint64 channel_id = 1;
-    uint64 to = 2;
-}
-
-message UnlinkChannel {
-    uint64 channel_id = 1;
-    uint64 from = 2;
+message GetChannelMessagesById {
+    repeated uint64 message_ids = 1;
 }
 
 message MoveChannel {
     uint64 channel_id = 1;
-    uint64 from = 2;
-    uint64 to = 3;
+    optional uint64 to = 2;
 }
 
 message JoinChannelBuffer {
@@ -1125,6 +1143,12 @@ message ChannelMessage {
     uint64 timestamp = 3;
     uint64 sender_id = 4;
     Nonce nonce = 5;
+    repeated ChatMention mentions = 6;
+}
+
+message ChatMention {
+    Range range = 1;
+    uint64 user_id = 2;
 }
 
 message RejoinChannelBuffers {
@@ -1216,7 +1240,6 @@ message ShowContacts {}
 
 message IncomingContactRequest {
     uint64 requester_id = 1;
-    bool should_notify = 2;
 }
 
 message UpdateDiagnostics {
@@ -1533,16 +1556,23 @@ message Nonce {
     uint64 lower_half = 2;
 }
 
+enum ChannelVisibility {
+    Public = 0;
+    Members = 1;
+}
+
 message Channel {
     uint64 id = 1;
     string name = 2;
+    ChannelVisibility visibility = 3;
+    ChannelRole role = 4;
+    repeated uint64 parent_path = 5;
 }
 
 message Contact {
     uint64 user_id = 1;
     bool online = 2;
     bool busy = 3;
-    bool should_notify = 4;
 }
 
 message WorktreeMetadata {
@@ -1557,3 +1587,34 @@ message UpdateDiffBase {
     uint64 buffer_id = 2;
     optional string diff_base = 3;
 }
+
+message GetNotifications {
+    optional uint64 before_id = 1;
+}
+
+message AddNotification {
+    Notification notification = 1;
+}
+
+message GetNotificationsResponse {
+    repeated Notification notifications = 1;
+    bool done = 2;
+}
+
+message DeleteNotification {
+    uint64 notification_id = 1;
+}
+
+message MarkNotificationRead {
+    uint64 notification_id = 1;
+}
+
+message Notification {
+    uint64 id = 1;
+    uint64 timestamp = 2;
+    string kind = 3;
+    optional uint64 entity_id = 4;
+    string content = 5;
+    bool is_read = 6;
+    optional bool response = 7;
+}

crates/rpc2/src/proto.rs 🔗

@@ -133,6 +133,9 @@ impl fmt::Display for PeerId {
 
 messages!(
     (Ack, Foreground),
+    (AckBufferOperation, Background),
+    (AckChannelMessage, Background),
+    (AddNotification, Foreground),
     (AddProjectCollaborator, Foreground),
     (ApplyCodeAction, Background),
     (ApplyCodeActionResponse, Background),
@@ -143,57 +146,74 @@ messages!(
     (Call, Foreground),
     (CallCanceled, Foreground),
     (CancelCall, Foreground),
+    (ChannelMessageSent, Foreground),
     (CopyProjectEntry, Foreground),
     (CreateBufferForPeer, Foreground),
     (CreateChannel, Foreground),
     (CreateChannelResponse, Foreground),
-    (ChannelMessageSent, Foreground),
     (CreateProjectEntry, Foreground),
     (CreateRoom, Foreground),
     (CreateRoomResponse, Foreground),
     (DeclineCall, Foreground),
+    (DeleteChannel, Foreground),
+    (DeleteNotification, Foreground),
     (DeleteProjectEntry, Foreground),
     (Error, Foreground),
     (ExpandProjectEntry, Foreground),
+    (ExpandProjectEntryResponse, Foreground),
     (Follow, Foreground),
     (FollowResponse, Foreground),
     (FormatBuffers, Foreground),
     (FormatBuffersResponse, Foreground),
     (FuzzySearchUsers, Foreground),
-    (GetCodeActions, Background),
-    (GetCodeActionsResponse, Background),
-    (GetHover, Background),
-    (GetHoverResponse, Background),
+    (GetChannelMembers, Foreground),
+    (GetChannelMembersResponse, Foreground),
     (GetChannelMessages, Background),
+    (GetChannelMessagesById, Background),
     (GetChannelMessagesResponse, Background),
-    (SendChannelMessage, Background),
-    (SendChannelMessageResponse, Background),
+    (GetCodeActions, Background),
+    (GetCodeActionsResponse, Background),
     (GetCompletions, Background),
     (GetCompletionsResponse, Background),
     (GetDefinition, Background),
     (GetDefinitionResponse, Background),
-    (GetTypeDefinition, Background),
-    (GetTypeDefinitionResponse, Background),
     (GetDocumentHighlights, Background),
     (GetDocumentHighlightsResponse, Background),
-    (GetReferences, Background),
-    (GetReferencesResponse, Background),
+    (GetHover, Background),
+    (GetHoverResponse, Background),
+    (GetNotifications, Foreground),
+    (GetNotificationsResponse, Foreground),
+    (GetPrivateUserInfo, Foreground),
+    (GetPrivateUserInfoResponse, Foreground),
     (GetProjectSymbols, Background),
     (GetProjectSymbolsResponse, Background),
+    (GetReferences, Background),
+    (GetReferencesResponse, Background),
+    (GetTypeDefinition, Background),
+    (GetTypeDefinitionResponse, Background),
     (GetUsers, Foreground),
     (Hello, Foreground),
     (IncomingCall, Foreground),
+    (InlayHints, Background),
+    (InlayHintsResponse, Background),
     (InviteChannelMember, Foreground),
-    (UsersResponse, Foreground),
+    (JoinChannel, Foreground),
+    (JoinChannelBuffer, Foreground),
+    (JoinChannelBufferResponse, Foreground),
+    (JoinChannelChat, Foreground),
+    (JoinChannelChatResponse, Foreground),
     (JoinProject, Foreground),
     (JoinProjectResponse, Foreground),
     (JoinRoom, Foreground),
     (JoinRoomResponse, Foreground),
-    (JoinChannelChat, Foreground),
-    (JoinChannelChatResponse, Foreground),
+    (LeaveChannelBuffer, Background),
     (LeaveChannelChat, Foreground),
     (LeaveProject, Foreground),
     (LeaveRoom, Foreground),
+    (MarkNotificationRead, Foreground),
+    (MoveChannel, Foreground),
+    (OnTypeFormatting, Background),
+    (OnTypeFormattingResponse, Background),
     (OpenBufferById, Background),
     (OpenBufferByPath, Background),
     (OpenBufferForSymbol, Background),
@@ -201,58 +221,56 @@ messages!(
     (OpenBufferResponse, Background),
     (PerformRename, Background),
     (PerformRenameResponse, Background),
-    (OnTypeFormatting, Background),
-    (OnTypeFormattingResponse, Background),
-    (InlayHints, Background),
-    (InlayHintsResponse, Background),
-    (ResolveInlayHint, Background),
-    (ResolveInlayHintResponse, Background),
-    (RefreshInlayHints, Foreground),
     (Ping, Foreground),
     (PrepareRename, Background),
     (PrepareRenameResponse, Background),
-    (ExpandProjectEntryResponse, Foreground),
     (ProjectEntryResponse, Foreground),
+    (RefreshInlayHints, Foreground),
+    (RejoinChannelBuffers, Foreground),
+    (RejoinChannelBuffersResponse, Foreground),
     (RejoinRoom, Foreground),
     (RejoinRoomResponse, Foreground),
-    (RemoveContact, Foreground),
-    (RemoveChannelMember, Foreground),
-    (RemoveChannelMessage, Foreground),
     (ReloadBuffers, Foreground),
     (ReloadBuffersResponse, Foreground),
+    (RemoveChannelMember, Foreground),
+    (RemoveChannelMessage, Foreground),
+    (RemoveContact, Foreground),
     (RemoveProjectCollaborator, Foreground),
+    (RenameChannel, Foreground),
+    (RenameChannelResponse, Foreground),
     (RenameProjectEntry, Foreground),
     (RequestContact, Foreground),
-    (RespondToContactRequest, Foreground),
+    (ResolveCompletionDocumentation, Background),
+    (ResolveCompletionDocumentationResponse, Background),
+    (ResolveInlayHint, Background),
+    (ResolveInlayHintResponse, Background),
     (RespondToChannelInvite, Foreground),
-    (JoinChannel, Foreground),
+    (RespondToContactRequest, Foreground),
     (RoomUpdated, Foreground),
     (SaveBuffer, Foreground),
-    (RenameChannel, Foreground),
-    (RenameChannelResponse, Foreground),
-    (SetChannelMemberAdmin, Foreground),
+    (SetChannelMemberRole, Foreground),
+    (SetChannelVisibility, Foreground),
     (SearchProject, Background),
     (SearchProjectResponse, Background),
+    (SendChannelMessage, Background),
+    (SendChannelMessageResponse, Background),
     (ShareProject, Foreground),
     (ShareProjectResponse, Foreground),
     (ShowContacts, Foreground),
     (StartLanguageServer, Foreground),
     (SynchronizeBuffers, Foreground),
     (SynchronizeBuffersResponse, Foreground),
-    (RejoinChannelBuffers, Foreground),
-    (RejoinChannelBuffersResponse, Foreground),
     (Test, Foreground),
     (Unfollow, Foreground),
     (UnshareProject, Foreground),
     (UpdateBuffer, Foreground),
     (UpdateBufferFile, Foreground),
-    (UpdateContacts, Foreground),
-    (DeleteChannel, Foreground),
-    (MoveChannel, Foreground),
-    (LinkChannel, Foreground),
-    (UnlinkChannel, Foreground),
+    (UpdateChannelBuffer, Foreground),
+    (UpdateChannelBufferCollaborators, Foreground),
     (UpdateChannels, Foreground),
+    (UpdateContacts, Foreground),
     (UpdateDiagnosticSummary, Foreground),
+    (UpdateDiffBase, Foreground),
     (UpdateFollowers, Foreground),
     (UpdateInviteInfo, Foreground),
     (UpdateLanguageServer, Foreground),
@@ -261,18 +279,7 @@ messages!(
     (UpdateProjectCollaborator, Foreground),
     (UpdateWorktree, Foreground),
     (UpdateWorktreeSettings, Foreground),
-    (UpdateDiffBase, Foreground),
-    (GetPrivateUserInfo, Foreground),
-    (GetPrivateUserInfoResponse, Foreground),
-    (GetChannelMembers, Foreground),
-    (GetChannelMembersResponse, Foreground),
-    (JoinChannelBuffer, Foreground),
-    (JoinChannelBufferResponse, Foreground),
-    (LeaveChannelBuffer, Background),
-    (UpdateChannelBuffer, Foreground),
-    (UpdateChannelBufferCollaborators, Foreground),
-    (AckBufferOperation, Background),
-    (AckChannelMessage, Background),
+    (UsersResponse, Foreground),
 );
 
 request_messages!(
@@ -284,72 +291,78 @@ request_messages!(
     (Call, Ack),
     (CancelCall, Ack),
     (CopyProjectEntry, ProjectEntryResponse),
+    (CreateChannel, CreateChannelResponse),
     (CreateProjectEntry, ProjectEntryResponse),
     (CreateRoom, CreateRoomResponse),
-    (CreateChannel, CreateChannelResponse),
     (DeclineCall, Ack),
+    (DeleteChannel, Ack),
     (DeleteProjectEntry, ProjectEntryResponse),
     (ExpandProjectEntry, ExpandProjectEntryResponse),
     (Follow, FollowResponse),
     (FormatBuffers, FormatBuffersResponse),
+    (FuzzySearchUsers, UsersResponse),
+    (GetChannelMembers, GetChannelMembersResponse),
+    (GetChannelMessages, GetChannelMessagesResponse),
+    (GetChannelMessagesById, GetChannelMessagesResponse),
     (GetCodeActions, GetCodeActionsResponse),
-    (GetHover, GetHoverResponse),
     (GetCompletions, GetCompletionsResponse),
     (GetDefinition, GetDefinitionResponse),
-    (GetTypeDefinition, GetTypeDefinitionResponse),
     (GetDocumentHighlights, GetDocumentHighlightsResponse),
-    (GetReferences, GetReferencesResponse),
+    (GetHover, GetHoverResponse),
+    (GetNotifications, GetNotificationsResponse),
     (GetPrivateUserInfo, GetPrivateUserInfoResponse),
     (GetProjectSymbols, GetProjectSymbolsResponse),
-    (FuzzySearchUsers, UsersResponse),
+    (GetReferences, GetReferencesResponse),
+    (GetTypeDefinition, GetTypeDefinitionResponse),
     (GetUsers, UsersResponse),
+    (IncomingCall, Ack),
+    (InlayHints, InlayHintsResponse),
     (InviteChannelMember, Ack),
+    (JoinChannel, JoinRoomResponse),
+    (JoinChannelBuffer, JoinChannelBufferResponse),
+    (JoinChannelChat, JoinChannelChatResponse),
     (JoinProject, JoinProjectResponse),
     (JoinRoom, JoinRoomResponse),
-    (JoinChannelChat, JoinChannelChatResponse),
+    (LeaveChannelBuffer, Ack),
     (LeaveRoom, Ack),
-    (RejoinRoom, RejoinRoomResponse),
-    (IncomingCall, Ack),
+    (MarkNotificationRead, Ack),
+    (MoveChannel, Ack),
+    (OnTypeFormatting, OnTypeFormattingResponse),
     (OpenBufferById, OpenBufferResponse),
     (OpenBufferByPath, OpenBufferResponse),
     (OpenBufferForSymbol, OpenBufferForSymbolResponse),
-    (Ping, Ack),
     (PerformRename, PerformRenameResponse),
+    (Ping, Ack),
     (PrepareRename, PrepareRenameResponse),
-    (OnTypeFormatting, OnTypeFormattingResponse),
-    (InlayHints, InlayHintsResponse),
-    (ResolveInlayHint, ResolveInlayHintResponse),
     (RefreshInlayHints, Ack),
+    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
+    (RejoinRoom, RejoinRoomResponse),
     (ReloadBuffers, ReloadBuffersResponse),
-    (RequestContact, Ack),
     (RemoveChannelMember, Ack),
-    (RemoveContact, Ack),
-    (RespondToContactRequest, Ack),
-    (RespondToChannelInvite, Ack),
-    (SetChannelMemberAdmin, Ack),
-    (SendChannelMessage, SendChannelMessageResponse),
-    (GetChannelMessages, GetChannelMessagesResponse),
-    (GetChannelMembers, GetChannelMembersResponse),
-    (JoinChannel, JoinRoomResponse),
     (RemoveChannelMessage, Ack),
-    (DeleteChannel, Ack),
-    (RenameProjectEntry, ProjectEntryResponse),
+    (RemoveContact, Ack),
     (RenameChannel, RenameChannelResponse),
-    (LinkChannel, Ack),
-    (UnlinkChannel, Ack),
-    (MoveChannel, Ack),
+    (RenameProjectEntry, ProjectEntryResponse),
+    (RequestContact, Ack),
+    (
+        ResolveCompletionDocumentation,
+        ResolveCompletionDocumentationResponse
+    ),
+    (ResolveInlayHint, ResolveInlayHintResponse),
+    (RespondToChannelInvite, Ack),
+    (RespondToContactRequest, Ack),
     (SaveBuffer, BufferSaved),
     (SearchProject, SearchProjectResponse),
+    (SendChannelMessage, SendChannelMessageResponse),
+    (SetChannelMemberRole, Ack),
+    (SetChannelVisibility, Ack),
     (ShareProject, ShareProjectResponse),
     (SynchronizeBuffers, SynchronizeBuffersResponse),
-    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
     (Test, Test),
     (UpdateBuffer, Ack),
     (UpdateParticipantLocation, Ack),
     (UpdateProject, Ack),
     (UpdateWorktree, Ack),
-    (JoinChannelBuffer, JoinChannelBufferResponse),
-    (LeaveChannelBuffer, Ack)
 );
 
 entity_messages!(
@@ -368,25 +381,26 @@ entity_messages!(
     GetCodeActions,
     GetCompletions,
     GetDefinition,
-    GetTypeDefinition,
     GetDocumentHighlights,
     GetHover,
-    GetReferences,
     GetProjectSymbols,
+    GetReferences,
+    GetTypeDefinition,
+    InlayHints,
     JoinProject,
     LeaveProject,
+    OnTypeFormatting,
     OpenBufferById,
     OpenBufferByPath,
     OpenBufferForSymbol,
     PerformRename,
-    OnTypeFormatting,
-    InlayHints,
-    ResolveInlayHint,
-    RefreshInlayHints,
     PrepareRename,
+    RefreshInlayHints,
     ReloadBuffers,
     RemoveProjectCollaborator,
     RenameProjectEntry,
+    ResolveCompletionDocumentation,
+    ResolveInlayHint,
     SaveBuffer,
     SearchProject,
     StartLanguageServer,
@@ -395,19 +409,19 @@ entity_messages!(
     UpdateBuffer,
     UpdateBufferFile,
     UpdateDiagnosticSummary,
+    UpdateDiffBase,
     UpdateLanguageServer,
     UpdateProject,
     UpdateProjectCollaborator,
     UpdateWorktree,
     UpdateWorktreeSettings,
-    UpdateDiffBase
 );
 
 entity_messages!(
     channel_id,
     ChannelMessageSent,
-    UpdateChannelBuffer,
     RemoveChannelMessage,
+    UpdateChannelBuffer,
     UpdateChannelBufferCollaborators,
 );