Detailed changes
@@ -1347,6 +1347,43 @@ dependencies = [
"uuid 1.4.1",
]
+[[package]]
+name = "channel2"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "client2",
+ "clock",
+ "collections",
+ "db2",
+ "feature_flags2",
+ "futures 0.3.28",
+ "gpui2",
+ "image",
+ "language2",
+ "lazy_static",
+ "log",
+ "parking_lot 0.11.2",
+ "postage",
+ "rand 0.8.5",
+ "rpc2",
+ "schemars",
+ "serde",
+ "serde_derive",
+ "settings2",
+ "smallvec",
+ "smol",
+ "sum_tree",
+ "tempfile",
+ "text",
+ "thiserror",
+ "time",
+ "tiny_http",
+ "url",
+ "util",
+ "uuid 1.4.1",
+]
+
[[package]]
name = "chrono"
version = "0.4.31"
@@ -10,6 +10,7 @@ members = [
"crates/call",
"crates/call2",
"crates/channel",
+ "crates/channel2",
"crates/cli",
"crates/client",
"crates/client2",
@@ -0,0 +1,54 @@
+[package]
+name = "channel2"
+version = "0.1.0"
+edition = "2021"
+publish = false
+
+[lib]
+path = "src/channel2.rs"
+doctest = false
+
+[features]
+test-support = ["collections/test-support", "gpui/test-support", "rpc/test-support"]
+
+[dependencies]
+client = { package = "client2", path = "../client2" }
+collections = { path = "../collections" }
+db = { package = "db2", path = "../db2" }
+gpui = { package = "gpui2", path = "../gpui2" }
+util = { path = "../util" }
+rpc = { package = "rpc2", path = "../rpc2" }
+text = { path = "../text" }
+language = { package = "language2", path = "../language2" }
+settings = { package = "settings2", path = "../settings2" }
+feature_flags = { package = "feature_flags2", path = "../feature_flags2" }
+sum_tree = { path = "../sum_tree" }
+clock = { path = "../clock" }
+
+anyhow.workspace = true
+futures.workspace = true
+image = "0.23"
+lazy_static.workspace = true
+smallvec.workspace = true
+log.workspace = true
+parking_lot.workspace = true
+postage.workspace = true
+rand.workspace = true
+schemars.workspace = true
+smol.workspace = true
+thiserror.workspace = true
+time.workspace = true
+tiny_http = "0.8"
+uuid.workspace = true
+url = "2.2"
+serde.workspace = true
+serde_derive.workspace = true
+tempfile = "3"
+
+[dev-dependencies]
+collections = { path = "../collections", features = ["test-support"] }
+gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] }
+rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] }
+client = { package = "client2", path = "../client2", features = ["test-support"] }
+settings = { package = "settings2", path = "../settings2", features = ["test-support"] }
+util = { path = "../util", features = ["test-support"] }
@@ -0,0 +1,23 @@
+mod channel_buffer;
+mod channel_chat;
+mod channel_store;
+
+use client::{Client, UserStore};
+use gpui::{AppContext, Model};
+use std::sync::Arc;
+
+pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent, ACKNOWLEDGE_DEBOUNCE_INTERVAL};
+pub use channel_chat::{
+ mentions_to_proto, ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId,
+ MessageParams,
+};
+pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore};
+
+#[cfg(test)]
+mod channel_store_tests;
+
+pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
+ channel_store::init(client, user_store, cx);
+ channel_buffer::init(client);
+ channel_chat::init(client);
+}
@@ -0,0 +1,259 @@
+use crate::{Channel, ChannelId, ChannelStore};
+use anyhow::Result;
+use client::{Client, Collaborator, UserStore};
+use collections::HashMap;
+use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
+use language::proto::serialize_version;
+use rpc::{
+ proto::{self, PeerId},
+ TypedEnvelope,
+};
+use std::{sync::Arc, time::Duration};
+use util::ResultExt;
+
+pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
+
+pub(crate) fn init(client: &Arc<Client>) {
+ client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
+ client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer_collaborators);
+}
+
+pub struct ChannelBuffer {
+ pub channel_id: ChannelId,
+ connected: bool,
+ collaborators: HashMap<PeerId, Collaborator>,
+ user_store: Model<UserStore>,
+ channel_store: Model<ChannelStore>,
+ buffer: Model<language::Buffer>,
+ buffer_epoch: u64,
+ client: Arc<Client>,
+ subscription: Option<client::Subscription>,
+ acknowledge_task: Option<Task<Result<()>>>,
+}
+
+pub enum ChannelBufferEvent {
+ CollaboratorsChanged,
+ Disconnected,
+ BufferEdited,
+ ChannelChanged,
+}
+
+impl EventEmitter for ChannelBuffer {
+ type Event = ChannelBufferEvent;
+}
+
+impl ChannelBuffer {
+ pub(crate) async fn new(
+ channel: Arc<Channel>,
+ client: Arc<Client>,
+ user_store: Model<UserStore>,
+ channel_store: Model<ChannelStore>,
+ mut cx: AsyncAppContext,
+ ) -> Result<Model<Self>> {
+ let response = client
+ .request(proto::JoinChannelBuffer {
+ channel_id: channel.id,
+ })
+ .await?;
+
+ let base_text = response.base_text;
+ let operations = response
+ .operations
+ .into_iter()
+ .map(language::proto::deserialize_operation)
+ .collect::<Result<Vec<_>, _>>()?;
+
+ let buffer = cx.build_model(|_| {
+ language::Buffer::remote(response.buffer_id, response.replica_id as u16, base_text)
+ })?;
+ buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))??;
+
+ let subscription = client.subscribe_to_entity(channel.id)?;
+
+ anyhow::Ok(cx.build_model(|cx| {
+ cx.subscribe(&buffer, Self::on_buffer_update).detach();
+ cx.on_release(Self::release).detach();
+ let mut this = Self {
+ buffer,
+ buffer_epoch: response.epoch,
+ client,
+ connected: true,
+ collaborators: Default::default(),
+ acknowledge_task: None,
+ channel_id: channel.id,
+ subscription: Some(subscription.set_model(&cx.handle(), &mut cx.to_async())),
+ user_store,
+ channel_store,
+ };
+ this.replace_collaborators(response.collaborators, cx);
+ this
+ })?)
+ }
+
+ fn release(&mut self, _: &mut AppContext) {
+ if self.connected {
+ if let Some(task) = self.acknowledge_task.take() {
+ task.detach();
+ }
+ self.client
+ .send(proto::LeaveChannelBuffer {
+ channel_id: self.channel_id,
+ })
+ .log_err();
+ }
+ }
+
+ pub fn remote_id(&self, cx: &AppContext) -> u64 {
+ self.buffer.read(cx).remote_id()
+ }
+
+ pub fn user_store(&self) -> &Model<UserStore> {
+ &self.user_store
+ }
+
+ pub(crate) fn replace_collaborators(
+ &mut self,
+ collaborators: Vec<proto::Collaborator>,
+ cx: &mut ModelContext<Self>,
+ ) {
+ let mut new_collaborators = HashMap::default();
+ for collaborator in collaborators {
+ if let Ok(collaborator) = Collaborator::from_proto(collaborator) {
+ new_collaborators.insert(collaborator.peer_id, collaborator);
+ }
+ }
+
+ for (_, old_collaborator) in &self.collaborators {
+ if !new_collaborators.contains_key(&old_collaborator.peer_id) {
+ self.buffer.update(cx, |buffer, cx| {
+ buffer.remove_peer(old_collaborator.replica_id as u16, cx)
+ });
+ }
+ }
+ self.collaborators = new_collaborators;
+ cx.emit(ChannelBufferEvent::CollaboratorsChanged);
+ cx.notify();
+ }
+
+ async fn handle_update_channel_buffer(
+ this: Model<Self>,
+ update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ let ops = update_channel_buffer
+ .payload
+ .operations
+ .into_iter()
+ .map(language::proto::deserialize_operation)
+ .collect::<Result<Vec<_>, _>>()?;
+
+ this.update(&mut cx, |this, cx| {
+ cx.notify();
+ this.buffer
+ .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
+ })??;
+
+ Ok(())
+ }
+
+ async fn handle_update_channel_buffer_collaborators(
+ this: Model<Self>,
+ message: TypedEnvelope<proto::UpdateChannelBufferCollaborators>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ this.update(&mut cx, |this, cx| {
+ this.replace_collaborators(message.payload.collaborators, cx);
+ cx.emit(ChannelBufferEvent::CollaboratorsChanged);
+ cx.notify();
+ })
+ }
+
+ fn on_buffer_update(
+ &mut self,
+ _: Model<language::Buffer>,
+ event: &language::Event,
+ cx: &mut ModelContext<Self>,
+ ) {
+ match event {
+ language::Event::Operation(operation) => {
+ let operation = language::proto::serialize_operation(operation);
+ self.client
+ .send(proto::UpdateChannelBuffer {
+ channel_id: self.channel_id,
+ operations: vec![operation],
+ })
+ .log_err();
+ }
+ language::Event::Edited => {
+ cx.emit(ChannelBufferEvent::BufferEdited);
+ }
+ _ => {}
+ }
+ }
+
+ pub fn acknowledge_buffer_version(&mut self, cx: &mut ModelContext<'_, ChannelBuffer>) {
+ let buffer = self.buffer.read(cx);
+ let version = buffer.version();
+ let buffer_id = buffer.remote_id();
+ let client = self.client.clone();
+ let epoch = self.epoch();
+
+ self.acknowledge_task = Some(cx.spawn(move |_, cx| async move {
+ cx.background_executor()
+ .timer(ACKNOWLEDGE_DEBOUNCE_INTERVAL)
+ .await;
+ client
+ .send(proto::AckBufferOperation {
+ buffer_id,
+ epoch,
+ version: serialize_version(&version),
+ })
+ .ok();
+ Ok(())
+ }));
+ }
+
+ pub fn epoch(&self) -> u64 {
+ self.buffer_epoch
+ }
+
+ pub fn buffer(&self) -> Model<language::Buffer> {
+ self.buffer.clone()
+ }
+
+ pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
+ &self.collaborators
+ }
+
+ pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
+ self.channel_store
+ .read(cx)
+ .channel_for_id(self.channel_id)
+ .cloned()
+ }
+
+ pub(crate) fn disconnect(&mut self, cx: &mut ModelContext<Self>) {
+ log::info!("channel buffer {} disconnected", self.channel_id);
+ if self.connected {
+ self.connected = false;
+ self.subscription.take();
+ cx.emit(ChannelBufferEvent::Disconnected);
+ cx.notify()
+ }
+ }
+
+ pub(crate) fn channel_changed(&mut self, cx: &mut ModelContext<Self>) {
+ cx.emit(ChannelBufferEvent::ChannelChanged);
+ cx.notify()
+ }
+
+ pub fn is_connected(&self) -> bool {
+ self.connected
+ }
+
+ pub fn replica_id(&self, cx: &AppContext) -> u16 {
+ self.buffer.read(cx).replica_id()
+ }
+}
@@ -0,0 +1,647 @@
+use crate::{Channel, ChannelId, ChannelStore};
+use anyhow::{anyhow, Result};
+use client::{
+ proto,
+ user::{User, UserStore},
+ Client, Subscription, TypedEnvelope, UserId,
+};
+use futures::lock::Mutex;
+use gpui::{AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task};
+use rand::prelude::*;
+use std::{
+ collections::HashSet,
+ mem,
+ ops::{ControlFlow, Range},
+ sync::Arc,
+};
+use sum_tree::{Bias, SumTree};
+use time::OffsetDateTime;
+use util::{post_inc, ResultExt as _, TryFutureExt};
+
+pub struct ChannelChat {
+ pub channel_id: ChannelId,
+ messages: SumTree<ChannelMessage>,
+ acknowledged_message_ids: HashSet<u64>,
+ channel_store: Model<ChannelStore>,
+ loaded_all_messages: bool,
+ last_acknowledged_id: Option<u64>,
+ next_pending_message_id: usize,
+ user_store: Model<UserStore>,
+ rpc: Arc<Client>,
+ outgoing_messages_lock: Arc<Mutex<()>>,
+ rng: StdRng,
+ _subscription: Subscription,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct MessageParams {
+ pub text: String,
+ pub mentions: Vec<(Range<usize>, UserId)>,
+}
+
+#[derive(Clone, Debug)]
+pub struct ChannelMessage {
+ pub id: ChannelMessageId,
+ pub body: String,
+ pub timestamp: OffsetDateTime,
+ pub sender: Arc<User>,
+ pub nonce: u128,
+ pub mentions: Vec<(Range<usize>, UserId)>,
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub enum ChannelMessageId {
+ Saved(u64),
+ Pending(usize),
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct ChannelMessageSummary {
+ max_id: ChannelMessageId,
+ count: usize,
+}
+
+#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct Count(usize);
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum ChannelChatEvent {
+ MessagesUpdated {
+ old_range: Range<usize>,
+ new_count: usize,
+ },
+ NewMessage {
+ channel_id: ChannelId,
+ message_id: u64,
+ },
+}
+
+impl EventEmitter for ChannelChat {
+ type Event = ChannelChatEvent;
+}
+pub fn init(client: &Arc<Client>) {
+ client.add_model_message_handler(ChannelChat::handle_message_sent);
+ client.add_model_message_handler(ChannelChat::handle_message_removed);
+}
+
+impl ChannelChat {
+ pub async fn new(
+ channel: Arc<Channel>,
+ channel_store: Model<ChannelStore>,
+ user_store: Model<UserStore>,
+ client: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<Model<Self>> {
+ let channel_id = channel.id;
+ let subscription = client.subscribe_to_entity(channel_id).unwrap();
+
+ let response = client
+ .request(proto::JoinChannelChat { channel_id })
+ .await?;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ let loaded_all_messages = response.done;
+
+ Ok(cx.build_model(|cx| {
+ cx.on_release(Self::release).detach();
+ let mut this = Self {
+ channel_id: channel.id,
+ user_store,
+ channel_store,
+ rpc: client,
+ outgoing_messages_lock: Default::default(),
+ messages: Default::default(),
+ acknowledged_message_ids: Default::default(),
+ loaded_all_messages,
+ next_pending_message_id: 0,
+ last_acknowledged_id: None,
+ rng: StdRng::from_entropy(),
+ _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
+ };
+ this.insert_messages(messages, cx);
+ this
+ })?)
+ }
+
+ fn release(&mut self, _: &mut AppContext) {
+ self.rpc
+ .send(proto::LeaveChannelChat {
+ channel_id: self.channel_id,
+ })
+ .log_err();
+ }
+
+ pub fn channel(&self, cx: &AppContext) -> Option<Arc<Channel>> {
+ self.channel_store
+ .read(cx)
+ .channel_for_id(self.channel_id)
+ .cloned()
+ }
+
+ pub fn client(&self) -> &Arc<Client> {
+ &self.rpc
+ }
+
+ pub fn send_message(
+ &mut self,
+ message: MessageParams,
+ cx: &mut ModelContext<Self>,
+ ) -> Result<Task<Result<u64>>> {
+ if message.text.is_empty() {
+ Err(anyhow!("message body can't be empty"))?;
+ }
+
+ let current_user = self
+ .user_store
+ .read(cx)
+ .current_user()
+ .ok_or_else(|| anyhow!("current_user is not present"))?;
+
+ let channel_id = self.channel_id;
+ let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
+ let nonce = self.rng.gen();
+ self.insert_messages(
+ SumTree::from_item(
+ ChannelMessage {
+ id: pending_id,
+ body: message.text.clone(),
+ sender: current_user,
+ timestamp: OffsetDateTime::now_utc(),
+ mentions: message.mentions.clone(),
+ nonce,
+ },
+ &(),
+ ),
+ cx,
+ );
+ let user_store = self.user_store.clone();
+ let rpc = self.rpc.clone();
+ let outgoing_messages_lock = self.outgoing_messages_lock.clone();
+ Ok(cx.spawn(move |this, mut cx| async move {
+ let outgoing_message_guard = outgoing_messages_lock.lock().await;
+ let request = rpc.request(proto::SendChannelMessage {
+ channel_id,
+ body: message.text,
+ nonce: Some(nonce.into()),
+ mentions: mentions_to_proto(&message.mentions),
+ });
+ let response = request.await?;
+ drop(outgoing_message_guard);
+ let response = response.message.ok_or_else(|| anyhow!("invalid message"))?;
+ let id = response.id;
+ let message = ChannelMessage::from_proto(response, &user_store, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx);
+ })?;
+ Ok(id)
+ }))
+ }
+
+ pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+ let response = self.rpc.request(proto::RemoveChannelMessage {
+ channel_id: self.channel_id,
+ message_id: id,
+ });
+ cx.spawn(move |this, mut cx| async move {
+ response.await?;
+ this.update(&mut cx, |this, cx| {
+ this.message_removed(id, cx);
+ })?;
+ Ok(())
+ })
+ }
+
+ pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Option<()>>> {
+ if self.loaded_all_messages {
+ return None;
+ }
+
+ let rpc = self.rpc.clone();
+ let user_store = self.user_store.clone();
+ let channel_id = self.channel_id;
+ let before_message_id = self.first_loaded_message_id()?;
+ Some(cx.spawn(move |this, mut cx| {
+ async move {
+ let response = rpc
+ .request(proto::GetChannelMessages {
+ channel_id,
+ before_message_id,
+ })
+ .await?;
+ let loaded_all_messages = response.done;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ this.loaded_all_messages = loaded_all_messages;
+ this.insert_messages(messages, cx);
+ })?;
+ anyhow::Ok(())
+ }
+ .log_err()
+ }))
+ }
+
+ pub fn first_loaded_message_id(&mut self) -> Option<u64> {
+ self.messages.first().and_then(|message| match message.id {
+ ChannelMessageId::Saved(id) => Some(id),
+ ChannelMessageId::Pending(_) => None,
+ })
+ }
+
+ /// Load all of the chat messages since a certain message id.
+ ///
+ /// For now, we always maintain a suffix of the channel's messages.
+ pub async fn load_history_since_message(
+ chat: Model<Self>,
+ message_id: u64,
+ mut cx: AsyncAppContext,
+ ) -> Option<usize> {
+ loop {
+ let step = chat
+ .update(&mut cx, |chat, cx| {
+ if let Some(first_id) = chat.first_loaded_message_id() {
+ if first_id <= message_id {
+ let mut cursor = chat.messages.cursor::<(ChannelMessageId, Count)>();
+ let message_id = ChannelMessageId::Saved(message_id);
+ cursor.seek(&message_id, Bias::Left, &());
+ return ControlFlow::Break(
+ if cursor
+ .item()
+ .map_or(false, |message| message.id == message_id)
+ {
+ Some(cursor.start().1 .0)
+ } else {
+ None
+ },
+ );
+ }
+ }
+ ControlFlow::Continue(chat.load_more_messages(cx))
+ })
+ .log_err()?;
+ match step {
+ ControlFlow::Break(ix) => return ix,
+ ControlFlow::Continue(task) => task?.await?,
+ }
+ }
+ }
+
+ pub fn acknowledge_last_message(&mut self, cx: &mut ModelContext<Self>) {
+ if let ChannelMessageId::Saved(latest_message_id) = self.messages.summary().max_id {
+ if self
+ .last_acknowledged_id
+ .map_or(true, |acknowledged_id| acknowledged_id < latest_message_id)
+ {
+ self.rpc
+ .send(proto::AckChannelMessage {
+ channel_id: self.channel_id,
+ message_id: latest_message_id,
+ })
+ .ok();
+ self.last_acknowledged_id = Some(latest_message_id);
+ self.channel_store.update(cx, |store, cx| {
+ store.acknowledge_message_id(self.channel_id, latest_message_id, cx);
+ });
+ }
+ }
+ }
+
+ pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
+ let user_store = self.user_store.clone();
+ let rpc = self.rpc.clone();
+ let channel_id = self.channel_id;
+ cx.spawn(move |this, mut cx| {
+ async move {
+ let response = rpc.request(proto::JoinChannelChat { channel_id }).await?;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ let loaded_all_messages = response.done;
+
+ let pending_messages = this.update(&mut cx, |this, cx| {
+ if let Some((first_new_message, last_old_message)) =
+ messages.first().zip(this.messages.last())
+ {
+ if first_new_message.id > last_old_message.id {
+ let old_messages = mem::take(&mut this.messages);
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: 0..old_messages.summary().count,
+ new_count: 0,
+ });
+ this.loaded_all_messages = loaded_all_messages;
+ }
+ }
+
+ this.insert_messages(messages, cx);
+ if loaded_all_messages {
+ this.loaded_all_messages = loaded_all_messages;
+ }
+
+ this.pending_messages().cloned().collect::<Vec<_>>()
+ })?;
+
+ for pending_message in pending_messages {
+ let request = rpc.request(proto::SendChannelMessage {
+ channel_id,
+ body: pending_message.body,
+ mentions: mentions_to_proto(&pending_message.mentions),
+ nonce: Some(pending_message.nonce.into()),
+ });
+ let response = request.await?;
+ let message = ChannelMessage::from_proto(
+ response.message.ok_or_else(|| anyhow!("invalid message"))?,
+ &user_store,
+ &mut cx,
+ )
+ .await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx);
+ })?;
+ }
+
+ anyhow::Ok(())
+ }
+ .log_err()
+ })
+ .detach();
+ }
+
+ pub fn message_count(&self) -> usize {
+ self.messages.summary().count
+ }
+
+ pub fn messages(&self) -> &SumTree<ChannelMessage> {
+ &self.messages
+ }
+
+ pub fn message(&self, ix: usize) -> &ChannelMessage {
+ let mut cursor = self.messages.cursor::<Count>();
+ cursor.seek(&Count(ix), Bias::Right, &());
+ cursor.item().unwrap()
+ }
+
+ pub fn acknowledge_message(&mut self, id: u64) {
+ if self.acknowledged_message_ids.insert(id) {
+ self.rpc
+ .send(proto::AckChannelMessage {
+ channel_id: self.channel_id,
+ message_id: id,
+ })
+ .ok();
+ }
+ }
+
+ pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
+ let mut cursor = self.messages.cursor::<Count>();
+ cursor.seek(&Count(range.start), Bias::Right, &());
+ cursor.take(range.len())
+ }
+
+ pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
+ let mut cursor = self.messages.cursor::<ChannelMessageId>();
+ cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
+ cursor
+ }
+
+ async fn handle_message_sent(
+ this: Model<Self>,
+ message: TypedEnvelope<proto::ChannelMessageSent>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
+ let message = message
+ .payload
+ .message
+ .ok_or_else(|| anyhow!("empty message"))?;
+ let message_id = message.id;
+
+ let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx);
+ cx.emit(ChannelChatEvent::NewMessage {
+ channel_id: this.channel_id,
+ message_id,
+ })
+ })?;
+
+ Ok(())
+ }
+
+ async fn handle_message_removed(
+ this: Model<Self>,
+ message: TypedEnvelope<proto::RemoveChannelMessage>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ this.update(&mut cx, |this, cx| {
+ this.message_removed(message.payload.message_id, cx)
+ })?;
+ Ok(())
+ }
+
+ fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
+ if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
+ let nonces = messages
+ .cursor::<()>()
+ .map(|m| m.nonce)
+ .collect::<HashSet<_>>();
+
+ let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
+ let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
+ let start_ix = old_cursor.start().1 .0;
+ let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
+ let removed_count = removed_messages.summary().count;
+ let new_count = messages.summary().count;
+ let end_ix = start_ix + removed_count;
+
+ new_messages.append(messages, &());
+
+ let mut ranges = Vec::<Range<usize>>::new();
+ if new_messages.last().unwrap().is_pending() {
+ new_messages.append(old_cursor.suffix(&()), &());
+ } else {
+ new_messages.append(
+ old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
+ &(),
+ );
+
+ while let Some(message) = old_cursor.item() {
+ let message_ix = old_cursor.start().1 .0;
+ if nonces.contains(&message.nonce) {
+ if ranges.last().map_or(false, |r| r.end == message_ix) {
+ ranges.last_mut().unwrap().end += 1;
+ } else {
+ ranges.push(message_ix..message_ix + 1);
+ }
+ } else {
+ new_messages.push(message.clone(), &());
+ }
+ old_cursor.next(&());
+ }
+ }
+
+ drop(old_cursor);
+ self.messages = new_messages;
+
+ for range in ranges.into_iter().rev() {
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: range,
+ new_count: 0,
+ });
+ }
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: start_ix..end_ix,
+ new_count,
+ });
+
+ cx.notify();
+ }
+ }
+
+ fn message_removed(&mut self, id: u64, cx: &mut ModelContext<Self>) {
+ let mut cursor = self.messages.cursor::<ChannelMessageId>();
+ let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
+ if let Some(item) = cursor.item() {
+ if item.id == ChannelMessageId::Saved(id) {
+ let ix = messages.summary().count;
+ cursor.next(&());
+ messages.append(cursor.suffix(&()), &());
+ drop(cursor);
+ self.messages = messages;
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: ix..ix + 1,
+ new_count: 0,
+ });
+ }
+ }
+ }
+}
+
+async fn messages_from_proto(
+ proto_messages: Vec<proto::ChannelMessage>,
+ user_store: &Model<UserStore>,
+ cx: &mut AsyncAppContext,
+) -> Result<SumTree<ChannelMessage>> {
+ let messages = ChannelMessage::from_proto_vec(proto_messages, user_store, cx).await?;
+ let mut result = SumTree::new();
+ result.extend(messages, &());
+ Ok(result)
+}
+
+impl ChannelMessage {
+ pub async fn from_proto(
+ message: proto::ChannelMessage,
+ user_store: &Model<UserStore>,
+ cx: &mut AsyncAppContext,
+ ) -> Result<Self> {
+ let sender = user_store
+ .update(cx, |user_store, cx| {
+ user_store.get_user(message.sender_id, cx)
+ })?
+ .await?;
+ Ok(ChannelMessage {
+ id: ChannelMessageId::Saved(message.id),
+ body: message.body,
+ mentions: message
+ .mentions
+ .into_iter()
+ .filter_map(|mention| {
+ let range = mention.range?;
+ Some((range.start as usize..range.end as usize, mention.user_id))
+ })
+ .collect(),
+ timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
+ sender,
+ nonce: message
+ .nonce
+ .ok_or_else(|| anyhow!("nonce is required"))?
+ .into(),
+ })
+ }
+
+ pub fn is_pending(&self) -> bool {
+ matches!(self.id, ChannelMessageId::Pending(_))
+ }
+
+ pub async fn from_proto_vec(
+ proto_messages: Vec<proto::ChannelMessage>,
+ user_store: &Model<UserStore>,
+ cx: &mut AsyncAppContext,
+ ) -> Result<Vec<Self>> {
+ let unique_user_ids = proto_messages
+ .iter()
+ .map(|m| m.sender_id)
+ .collect::<HashSet<_>>()
+ .into_iter()
+ .collect();
+ user_store
+ .update(cx, |user_store, cx| {
+ user_store.get_users(unique_user_ids, cx)
+ })?
+ .await?;
+
+ let mut messages = Vec::with_capacity(proto_messages.len());
+ for message in proto_messages {
+ messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
+ }
+ Ok(messages)
+ }
+}
+
+pub fn mentions_to_proto(mentions: &[(Range<usize>, UserId)]) -> Vec<proto::ChatMention> {
+ mentions
+ .iter()
+ .map(|(range, user_id)| proto::ChatMention {
+ range: Some(proto::Range {
+ start: range.start as u64,
+ end: range.end as u64,
+ }),
+ user_id: *user_id as u64,
+ })
+ .collect()
+}
+
+impl sum_tree::Item for ChannelMessage {
+ type Summary = ChannelMessageSummary;
+
+ fn summary(&self) -> Self::Summary {
+ ChannelMessageSummary {
+ max_id: self.id,
+ count: 1,
+ }
+ }
+}
+
+impl Default for ChannelMessageId {
+ fn default() -> Self {
+ Self::Saved(0)
+ }
+}
+
+impl sum_tree::Summary for ChannelMessageSummary {
+ type Context = ();
+
+ fn add_summary(&mut self, summary: &Self, _: &()) {
+ self.max_id = summary.max_id;
+ self.count += summary.count;
+ }
+}
+
+impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
+ fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
+ debug_assert!(summary.max_id > *self);
+ *self = summary.max_id;
+ }
+}
+
+impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
+ fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
+ self.0 += summary.count;
+ }
+}
+
+impl<'a> From<&'a str> for MessageParams {
+ fn from(value: &'a str) -> Self {
+ Self {
+ text: value.into(),
+ mentions: Vec::new(),
+ }
+ }
+}
@@ -0,0 +1,1021 @@
+mod channel_index;
+
+use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
+use anyhow::{anyhow, Result};
+use channel_index::ChannelIndex;
+use client::{Client, Subscription, User, UserId, UserStore};
+use collections::{hash_map, HashMap, HashSet};
+use db::RELEASE_CHANNEL;
+use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
+use gpui::{
+ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
+};
+use rpc::{
+ proto::{self, ChannelVisibility},
+ TypedEnvelope,
+};
+use std::{mem, sync::Arc, time::Duration};
+use util::{async_maybe, ResultExt};
+
+pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
+ let channel_store =
+ cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
+ cx.set_global(channel_store);
+}
+
+pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub type ChannelId = u64;
+
+pub struct ChannelStore {
+ pub channel_index: ChannelIndex,
+ channel_invitations: Vec<Arc<Channel>>,
+ channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
+ outgoing_invites: HashSet<(ChannelId, UserId)>,
+ update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
+ opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
+ opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
+ client: Arc<Client>,
+ user_store: Model<UserStore>,
+ _rpc_subscription: Subscription,
+ _watch_connection_status: Task<Option<()>>,
+ disconnect_channel_buffers_task: Option<Task<()>>,
+ _update_channels: Task<()>,
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct Channel {
+ pub id: ChannelId,
+ pub name: String,
+ pub visibility: proto::ChannelVisibility,
+ pub role: proto::ChannelRole,
+ pub unseen_note_version: Option<(u64, clock::Global)>,
+ pub unseen_message_id: Option<u64>,
+ pub parent_path: Vec<u64>,
+}
+
+impl Channel {
+ pub fn link(&self) -> String {
+ RELEASE_CHANNEL.link_prefix().to_owned()
+ + "channel/"
+ + &self.slug()
+ + "-"
+ + &self.id.to_string()
+ }
+
+ pub fn slug(&self) -> String {
+ let slug: String = self
+ .name
+ .chars()
+ .map(|c| if c.is_alphanumeric() { c } else { '-' })
+ .collect();
+
+ slug.trim_matches(|c| c == '-').to_string()
+ }
+
+ pub fn can_edit_notes(&self) -> bool {
+ self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
+ }
+}
+
+pub struct ChannelMembership {
+ pub user: Arc<User>,
+ pub kind: proto::channel_member::Kind,
+ pub role: proto::ChannelRole,
+}
+impl ChannelMembership {
+ pub fn sort_key(&self) -> MembershipSortKey {
+ MembershipSortKey {
+ role_order: match self.role {
+ proto::ChannelRole::Admin => 0,
+ proto::ChannelRole::Member => 1,
+ proto::ChannelRole::Banned => 2,
+ proto::ChannelRole::Guest => 3,
+ },
+ kind_order: match self.kind {
+ proto::channel_member::Kind::Member => 0,
+ proto::channel_member::Kind::AncestorMember => 1,
+ proto::channel_member::Kind::Invitee => 2,
+ },
+ username_order: self.user.github_login.as_str(),
+ }
+ }
+}
+
+#[derive(PartialOrd, Ord, PartialEq, Eq)]
+pub struct MembershipSortKey<'a> {
+ role_order: u8,
+ kind_order: u8,
+ username_order: &'a str,
+}
+
+pub enum ChannelEvent {
+ ChannelCreated(ChannelId),
+ ChannelRenamed(ChannelId),
+}
+
+impl EventEmitter for ChannelStore {
+ type Event = ChannelEvent;
+}
+
+enum OpenedModelHandle<E> {
+ Open(WeakModel<E>),
+ Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
+}
+
+impl ChannelStore {
+ pub fn global(cx: &AppContext) -> Model<Self> {
+ cx.global::<Model<Self>>().clone()
+ }
+
+ pub fn new(
+ client: Arc<Client>,
+ user_store: Model<UserStore>,
+ cx: &mut ModelContext<Self>,
+ ) -> Self {
+ let rpc_subscription =
+ client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
+
+ let mut connection_status = client.status();
+ let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
+ let watch_connection_status = cx.spawn(|this, mut cx| async move {
+ while let Some(status) = connection_status.next().await {
+ let this = this.upgrade()?;
+ match status {
+ client::Status::Connected { .. } => {
+ this.update(&mut cx, |this, cx| this.handle_connect(cx))
+ .ok()?
+ .await
+ .log_err()?;
+ }
+ client::Status::SignedOut | client::Status::UpgradeRequired => {
+ this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
+ .ok();
+ }
+ _ => {
+ this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
+ .ok();
+ }
+ }
+ }
+ Some(())
+ });
+
+ Self {
+ channel_invitations: Vec::default(),
+ channel_index: ChannelIndex::default(),
+ channel_participants: Default::default(),
+ outgoing_invites: Default::default(),
+ opened_buffers: Default::default(),
+ opened_chats: Default::default(),
+ update_channels_tx,
+ client,
+ user_store,
+ _rpc_subscription: rpc_subscription,
+ _watch_connection_status: watch_connection_status,
+ disconnect_channel_buffers_task: None,
+ _update_channels: cx.spawn(|this, mut cx| async move {
+ async_maybe!({
+ while let Some(update_channels) = update_channels_rx.next().await {
+ if let Some(this) = this.upgrade() {
+ let update_task = this.update(&mut cx, |this, cx| {
+ this.update_channels(update_channels, cx)
+ })?;
+ if let Some(update_task) = update_task {
+ update_task.await.log_err();
+ }
+ }
+ }
+ anyhow::Ok(())
+ })
+ .await
+ .log_err();
+ }),
+ }
+ }
+
+ pub fn client(&self) -> Arc<Client> {
+ self.client.clone()
+ }
+
+ /// Returns the number of unique channels in the store
+ pub fn channel_count(&self) -> usize {
+ self.channel_index.by_id().len()
+ }
+
+ /// Returns the index of a channel ID in the list of unique channels
+ pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
+ self.channel_index
+ .by_id()
+ .keys()
+ .position(|id| *id == channel_id)
+ }
+
+ /// Returns an iterator over all unique channels
+ pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
+ self.channel_index.by_id().values()
+ }
+
+ /// Iterate over all entries in the channel DAG
+ pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
+ self.channel_index
+ .ordered_channels()
+ .iter()
+ .filter_map(move |id| {
+ let channel = self.channel_index.by_id().get(id)?;
+ Some((channel.parent_path.len(), channel))
+ })
+ }
+
+ pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
+ let channel_id = self.channel_index.ordered_channels().get(ix)?;
+ self.channel_index.by_id().get(channel_id)
+ }
+
+ pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
+ self.channel_index.by_id().values().nth(ix)
+ }
+
+ pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
+ self.channel_invitations
+ .iter()
+ .any(|channel| channel.id == channel_id)
+ }
+
+ pub fn channel_invitations(&self) -> &[Arc<Channel>] {
+ &self.channel_invitations
+ }
+
+ pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
+ self.channel_index.by_id().get(&channel_id)
+ }
+
+ pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
+ if let Some(buffer) = self.opened_buffers.get(&channel_id) {
+ if let OpenedModelHandle::Open(buffer) = buffer {
+ return buffer.upgrade().is_some();
+ }
+ }
+ false
+ }
+
+ pub fn open_channel_buffer(
+ &mut self,
+ channel_id: ChannelId,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<Model<ChannelBuffer>>> {
+ let client = self.client.clone();
+ let user_store = self.user_store.clone();
+ let channel_store = cx.handle();
+ self.open_channel_resource(
+ channel_id,
+ |this| &mut this.opened_buffers,
+ |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
+ cx,
+ )
+ }
+
+ pub fn fetch_channel_messages(
+ &self,
+ message_ids: Vec<u64>,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<Vec<ChannelMessage>>> {
+ let request = if message_ids.is_empty() {
+ None
+ } else {
+ Some(
+ self.client
+ .request(proto::GetChannelMessagesById { message_ids }),
+ )
+ };
+ cx.spawn(|this, mut cx| async move {
+ if let Some(request) = request {
+ let response = request.await?;
+ let this = this
+ .upgrade()
+ .ok_or_else(|| anyhow!("channel store dropped"))?;
+ let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
+ ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
+ } else {
+ Ok(Vec::new())
+ }
+ })
+ }
+
+ pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
+ self.channel_index
+ .by_id()
+ .get(&channel_id)
+ .map(|channel| channel.unseen_note_version.is_some())
+ }
+
+ pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
+ self.channel_index
+ .by_id()
+ .get(&channel_id)
+ .map(|channel| channel.unseen_message_id.is_some())
+ }
+
+ pub fn notes_changed(
+ &mut self,
+ channel_id: ChannelId,
+ epoch: u64,
+ version: &clock::Global,
+ cx: &mut ModelContext<Self>,
+ ) {
+ self.channel_index.note_changed(channel_id, epoch, version);
+ cx.notify();
+ }
+
+ pub fn new_message(
+ &mut self,
+ channel_id: ChannelId,
+ message_id: u64,
+ cx: &mut ModelContext<Self>,
+ ) {
+ self.channel_index.new_message(channel_id, message_id);
+ cx.notify();
+ }
+
+ pub fn acknowledge_message_id(
+ &mut self,
+ channel_id: ChannelId,
+ message_id: u64,
+ cx: &mut ModelContext<Self>,
+ ) {
+ self.channel_index
+ .acknowledge_message_id(channel_id, message_id);
+ cx.notify();
+ }
+
+ pub fn acknowledge_notes_version(
+ &mut self,
+ channel_id: ChannelId,
+ epoch: u64,
+ version: &clock::Global,
+ cx: &mut ModelContext<Self>,
+ ) {
+ self.channel_index
+ .acknowledge_note_version(channel_id, epoch, version);
+ cx.notify();
+ }
+
+ pub fn open_channel_chat(
+ &mut self,
+ channel_id: ChannelId,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<Model<ChannelChat>>> {
+ let client = self.client.clone();
+ let user_store = self.user_store.clone();
+ let this = cx.handle();
+ self.open_channel_resource(
+ channel_id,
+ |this| &mut this.opened_chats,
+ |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
+ cx,
+ )
+ }
+
+ /// Asynchronously open a given resource associated with a channel.
+ ///
+ /// Make sure that the resource is only opened once, even if this method
+ /// is called multiple times with the same channel id while the first task
+ /// is still running.
+ fn open_channel_resource<T, F, Fut>(
+ &mut self,
+ channel_id: ChannelId,
+ get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
+ load: F,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<Model<T>>>
+ where
+ F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
+ Fut: Future<Output = Result<Model<T>>>,
+ T: 'static,
+ {
+ let task = loop {
+ match get_map(self).entry(channel_id) {
+ hash_map::Entry::Occupied(e) => match e.get() {
+ OpenedModelHandle::Open(model) => {
+ if let Some(model) = model.upgrade() {
+ break Task::ready(Ok(model)).shared();
+ } else {
+ get_map(self).remove(&channel_id);
+ continue;
+ }
+ }
+ OpenedModelHandle::Loading(task) => {
+ break task.clone();
+ }
+ },
+ hash_map::Entry::Vacant(e) => {
+ let task = cx
+ .spawn(move |this, mut cx| async move {
+ let channel = this.update(&mut cx, |this, _| {
+ this.channel_for_id(channel_id).cloned().ok_or_else(|| {
+ Arc::new(anyhow!("no channel for id: {}", channel_id))
+ })
+ })??;
+
+ load(channel, cx).await.map_err(Arc::new)
+ })
+ .shared();
+
+ e.insert(OpenedModelHandle::Loading(task.clone()));
+ cx.spawn({
+ let task = task.clone();
+ move |this, mut cx| async move {
+ let result = task.await;
+ this.update(&mut cx, |this, _| match result {
+ Ok(model) => {
+ get_map(this).insert(
+ channel_id,
+ OpenedModelHandle::Open(model.downgrade()),
+ );
+ }
+ Err(_) => {
+ get_map(this).remove(&channel_id);
+ }
+ })
+ .ok();
+ }
+ })
+ .detach();
+ break task;
+ }
+ }
+ };
+ cx.background_executor()
+ .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
+ }
+
+ pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
+ let Some(channel) = self.channel_for_id(channel_id) else {
+ return false;
+ };
+ channel.role == proto::ChannelRole::Admin
+ }
+
+ pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
+ self.channel_participants
+ .get(&channel_id)
+ .map_or(&[], |v| v.as_slice())
+ }
+
+ pub fn create_channel(
+ &self,
+ name: &str,
+ parent_id: Option<ChannelId>,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<ChannelId>> {
+ let client = self.client.clone();
+ let name = name.trim_start_matches("#").to_owned();
+ cx.spawn(move |this, mut cx| async move {
+ let response = client
+ .request(proto::CreateChannel { name, parent_id })
+ .await?;
+
+ let channel = response
+ .channel
+ .ok_or_else(|| anyhow!("missing channel in response"))?;
+ let channel_id = channel.id;
+
+ this.update(&mut cx, |this, cx| {
+ let task = this.update_channels(
+ proto::UpdateChannels {
+ channels: vec![channel],
+ ..Default::default()
+ },
+ cx,
+ );
+ assert!(task.is_none());
+
+ // This event is emitted because the collab panel wants to clear the pending edit state
+ // before this frame is rendered. But we can't guarantee that the collab panel's future
+ // will resolve before this flush_effects finishes. Synchronously emitting this event
+ // ensures that the collab panel will observe this creation before the frame completes
+ cx.emit(ChannelEvent::ChannelCreated(channel_id));
+ })?;
+
+ Ok(channel_id)
+ })
+ }
+
+ pub fn move_channel(
+ &mut self,
+ channel_id: ChannelId,
+ to: Option<ChannelId>,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ let client = self.client.clone();
+ cx.spawn(move |_, _| async move {
+ let _ = client
+ .request(proto::MoveChannel { channel_id, to })
+ .await?;
+
+ Ok(())
+ })
+ }
+
+ pub fn set_channel_visibility(
+ &mut self,
+ channel_id: ChannelId,
+ visibility: ChannelVisibility,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ let client = self.client.clone();
+ cx.spawn(move |_, _| async move {
+ let _ = client
+ .request(proto::SetChannelVisibility {
+ channel_id,
+ visibility: visibility.into(),
+ })
+ .await?;
+
+ Ok(())
+ })
+ }
+
+ pub fn invite_member(
+ &mut self,
+ channel_id: ChannelId,
+ user_id: UserId,
+ role: proto::ChannelRole,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ if !self.outgoing_invites.insert((channel_id, user_id)) {
+ return Task::ready(Err(anyhow!("invite request already in progress")));
+ }
+
+ cx.notify();
+ let client = self.client.clone();
+ cx.spawn(move |this, mut cx| async move {
+ let result = client
+ .request(proto::InviteChannelMember {
+ channel_id,
+ user_id,
+ role: role.into(),
+ })
+ .await;
+
+ this.update(&mut cx, |this, cx| {
+ this.outgoing_invites.remove(&(channel_id, user_id));
+ cx.notify();
+ })?;
+
+ result?;
+
+ Ok(())
+ })
+ }
+
+ pub fn remove_member(
+ &mut self,
+ channel_id: ChannelId,
+ user_id: u64,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ if !self.outgoing_invites.insert((channel_id, user_id)) {
+ return Task::ready(Err(anyhow!("invite request already in progress")));
+ }
+
+ cx.notify();
+ let client = self.client.clone();
+ cx.spawn(move |this, mut cx| async move {
+ let result = client
+ .request(proto::RemoveChannelMember {
+ channel_id,
+ user_id,
+ })
+ .await;
+
+ this.update(&mut cx, |this, cx| {
+ this.outgoing_invites.remove(&(channel_id, user_id));
+ cx.notify();
+ })?;
+ result?;
+ Ok(())
+ })
+ }
+
+ pub fn set_member_role(
+ &mut self,
+ channel_id: ChannelId,
+ user_id: UserId,
+ role: proto::ChannelRole,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ if !self.outgoing_invites.insert((channel_id, user_id)) {
+ return Task::ready(Err(anyhow!("member request already in progress")));
+ }
+
+ cx.notify();
+ let client = self.client.clone();
+ cx.spawn(move |this, mut cx| async move {
+ let result = client
+ .request(proto::SetChannelMemberRole {
+ channel_id,
+ user_id,
+ role: role.into(),
+ })
+ .await;
+
+ this.update(&mut cx, |this, cx| {
+ this.outgoing_invites.remove(&(channel_id, user_id));
+ cx.notify();
+ })?;
+
+ result?;
+ Ok(())
+ })
+ }
+
+ pub fn rename(
+ &mut self,
+ channel_id: ChannelId,
+ new_name: &str,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ let client = self.client.clone();
+ let name = new_name.to_string();
+ cx.spawn(move |this, mut cx| async move {
+ let channel = client
+ .request(proto::RenameChannel { channel_id, name })
+ .await?
+ .channel
+ .ok_or_else(|| anyhow!("missing channel in response"))?;
+ this.update(&mut cx, |this, cx| {
+ let task = this.update_channels(
+ proto::UpdateChannels {
+ channels: vec![channel],
+ ..Default::default()
+ },
+ cx,
+ );
+ assert!(task.is_none());
+
+ // This event is emitted because the collab panel wants to clear the pending edit state
+ // before this frame is rendered. But we can't guarantee that the collab panel's future
+ // will resolve before this flush_effects finishes. Synchronously emitting this event
+ // ensures that the collab panel will observe this creation before the frame complete
+ cx.emit(ChannelEvent::ChannelRenamed(channel_id))
+ })?;
+ Ok(())
+ })
+ }
+
+ pub fn respond_to_channel_invite(
+ &mut self,
+ channel_id: ChannelId,
+ accept: bool,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<()>> {
+ let client = self.client.clone();
+ cx.background_executor().spawn(async move {
+ client
+ .request(proto::RespondToChannelInvite { channel_id, accept })
+ .await?;
+ Ok(())
+ })
+ }
+
+ pub fn get_channel_member_details(
+ &self,
+ channel_id: ChannelId,
+ cx: &mut ModelContext<Self>,
+ ) -> Task<Result<Vec<ChannelMembership>>> {
+ let client = self.client.clone();
+ let user_store = self.user_store.downgrade();
+ cx.spawn(move |_, mut cx| async move {
+ let response = client
+ .request(proto::GetChannelMembers { channel_id })
+ .await?;
+
+ let user_ids = response.members.iter().map(|m| m.user_id).collect();
+ let user_store = user_store
+ .upgrade()
+ .ok_or_else(|| anyhow!("user store dropped"))?;
+ let users = user_store
+ .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
+ .await?;
+
+ Ok(users
+ .into_iter()
+ .zip(response.members)
+ .filter_map(|(user, member)| {
+ Some(ChannelMembership {
+ user,
+ role: member.role(),
+ kind: member.kind(),
+ })
+ })
+ .collect())
+ })
+ }
+
+ pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
+ let client = self.client.clone();
+ async move {
+ client.request(proto::DeleteChannel { channel_id }).await?;
+ Ok(())
+ }
+ }
+
+ pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
+ false
+ }
+
+ pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
+ self.outgoing_invites.contains(&(channel_id, user_id))
+ }
+
+ async fn handle_update_channels(
+ this: Model<Self>,
+ message: TypedEnvelope<proto::UpdateChannels>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ this.update(&mut cx, |this, _| {
+ this.update_channels_tx
+ .unbounded_send(message.payload)
+ .unwrap();
+ })?;
+ Ok(())
+ }
+
+ fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
+ self.channel_index.clear();
+ self.channel_invitations.clear();
+ self.channel_participants.clear();
+ self.channel_index.clear();
+ self.outgoing_invites.clear();
+ self.disconnect_channel_buffers_task.take();
+
+ for chat in self.opened_chats.values() {
+ if let OpenedModelHandle::Open(chat) = chat {
+ if let Some(chat) = chat.upgrade() {
+ chat.update(cx, |chat, cx| {
+ chat.rejoin(cx);
+ });
+ }
+ }
+ }
+
+ let mut buffer_versions = Vec::new();
+ for buffer in self.opened_buffers.values() {
+ if let OpenedModelHandle::Open(buffer) = buffer {
+ if let Some(buffer) = buffer.upgrade() {
+ let channel_buffer = buffer.read(cx);
+ let buffer = channel_buffer.buffer().read(cx);
+ buffer_versions.push(proto::ChannelBufferVersion {
+ channel_id: channel_buffer.channel_id,
+ epoch: channel_buffer.epoch(),
+ version: language::proto::serialize_version(&buffer.version()),
+ });
+ }
+ }
+ }
+
+ if buffer_versions.is_empty() {
+ return Task::ready(Ok(()));
+ }
+
+ let response = self.client.request(proto::RejoinChannelBuffers {
+ buffers: buffer_versions,
+ });
+
+ cx.spawn(|this, mut cx| async move {
+ let mut response = response.await?;
+
+ this.update(&mut cx, |this, cx| {
+ this.opened_buffers.retain(|_, buffer| match buffer {
+ OpenedModelHandle::Open(channel_buffer) => {
+ let Some(channel_buffer) = channel_buffer.upgrade() else {
+ return false;
+ };
+
+ channel_buffer.update(cx, |channel_buffer, cx| {
+ let channel_id = channel_buffer.channel_id;
+ if let Some(remote_buffer) = response
+ .buffers
+ .iter_mut()
+ .find(|buffer| buffer.channel_id == channel_id)
+ {
+ let channel_id = channel_buffer.channel_id;
+ let remote_version =
+ language::proto::deserialize_version(&remote_buffer.version);
+
+ channel_buffer.replace_collaborators(
+ mem::take(&mut remote_buffer.collaborators),
+ cx,
+ );
+
+ let operations = channel_buffer
+ .buffer()
+ .update(cx, |buffer, cx| {
+ let outgoing_operations =
+ buffer.serialize_ops(Some(remote_version), cx);
+ let incoming_operations =
+ mem::take(&mut remote_buffer.operations)
+ .into_iter()
+ .map(language::proto::deserialize_operation)
+ .collect::<Result<Vec<_>>>()?;
+ buffer.apply_ops(incoming_operations, cx)?;
+ anyhow::Ok(outgoing_operations)
+ })
+ .log_err();
+
+ if let Some(operations) = operations {
+ let client = this.client.clone();
+ cx.background_executor()
+ .spawn(async move {
+ let operations = operations.await;
+ for chunk in
+ language::proto::split_operations(operations)
+ {
+ client
+ .send(proto::UpdateChannelBuffer {
+ channel_id,
+ operations: chunk,
+ })
+ .ok();
+ }
+ })
+ .detach();
+ return true;
+ }
+ }
+
+ channel_buffer.disconnect(cx);
+ false
+ })
+ }
+ OpenedModelHandle::Loading(_) => true,
+ });
+ })
+ .ok();
+ anyhow::Ok(())
+ })
+ }
+
+ fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
+ cx.notify();
+
+ self.disconnect_channel_buffers_task.get_or_insert_with(|| {
+ cx.spawn(move |this, mut cx| async move {
+ if wait_for_reconnect {
+ cx.background_executor().timer(RECONNECT_TIMEOUT).await;
+ }
+
+ if let Some(this) = this.upgrade() {
+ this.update(&mut cx, |this, cx| {
+ for (_, buffer) in this.opened_buffers.drain() {
+ if let OpenedModelHandle::Open(buffer) = buffer {
+ if let Some(buffer) = buffer.upgrade() {
+ buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
+ }
+ }
+ }
+ })
+ .ok();
+ }
+ })
+ });
+ }
+
+ pub(crate) fn update_channels(
+ &mut self,
+ payload: proto::UpdateChannels,
+ cx: &mut ModelContext<ChannelStore>,
+ ) -> Option<Task<Result<()>>> {
+ if !payload.remove_channel_invitations.is_empty() {
+ self.channel_invitations
+ .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
+ }
+ for channel in payload.channel_invitations {
+ match self
+ .channel_invitations
+ .binary_search_by_key(&channel.id, |c| c.id)
+ {
+ Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
+ Err(ix) => self.channel_invitations.insert(
+ ix,
+ Arc::new(Channel {
+ id: channel.id,
+ visibility: channel.visibility(),
+ role: channel.role(),
+ name: channel.name,
+ unseen_note_version: None,
+ unseen_message_id: None,
+ parent_path: channel.parent_path,
+ }),
+ ),
+ }
+ }
+
+ let channels_changed = !payload.channels.is_empty()
+ || !payload.delete_channels.is_empty()
+ || !payload.unseen_channel_messages.is_empty()
+ || !payload.unseen_channel_buffer_changes.is_empty();
+
+ if channels_changed {
+ if !payload.delete_channels.is_empty() {
+ self.channel_index.delete_channels(&payload.delete_channels);
+ self.channel_participants
+ .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
+
+ for channel_id in &payload.delete_channels {
+ let channel_id = *channel_id;
+ if payload
+ .channels
+ .iter()
+ .any(|channel| channel.id == channel_id)
+ {
+ continue;
+ }
+ if let Some(OpenedModelHandle::Open(buffer)) =
+ self.opened_buffers.remove(&channel_id)
+ {
+ if let Some(buffer) = buffer.upgrade() {
+ buffer.update(cx, ChannelBuffer::disconnect);
+ }
+ }
+ }
+ }
+
+ let mut index = self.channel_index.bulk_insert();
+ for channel in payload.channels {
+ let id = channel.id;
+ let channel_changed = index.insert(channel);
+
+ if channel_changed {
+ if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
+ if let Some(buffer) = buffer.upgrade() {
+ buffer.update(cx, ChannelBuffer::channel_changed);
+ }
+ }
+ }
+ }
+
+ for unseen_buffer_change in payload.unseen_channel_buffer_changes {
+ let version = language::proto::deserialize_version(&unseen_buffer_change.version);
+ index.note_changed(
+ unseen_buffer_change.channel_id,
+ unseen_buffer_change.epoch,
+ &version,
+ );
+ }
+
+ for unseen_channel_message in payload.unseen_channel_messages {
+ index.new_messages(
+ unseen_channel_message.channel_id,
+ unseen_channel_message.message_id,
+ );
+ }
+ }
+
+ cx.notify();
+ if payload.channel_participants.is_empty() {
+ return None;
+ }
+
+ let mut all_user_ids = Vec::new();
+ let channel_participants = payload.channel_participants;
+ for entry in &channel_participants {
+ for user_id in entry.participant_user_ids.iter() {
+ if let Err(ix) = all_user_ids.binary_search(user_id) {
+ all_user_ids.insert(ix, *user_id);
+ }
+ }
+ }
+
+ let users = self
+ .user_store
+ .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
+ Some(cx.spawn(|this, mut cx| async move {
+ let users = users.await?;
+
+ this.update(&mut cx, |this, cx| {
+ for entry in &channel_participants {
+ let mut participants: Vec<_> = entry
+ .participant_user_ids
+ .iter()
+ .filter_map(|user_id| {
+ users
+ .binary_search_by_key(&user_id, |user| &user.id)
+ .ok()
+ .map(|ix| users[ix].clone())
+ })
+ .collect();
+
+ participants.sort_by_key(|u| u.id);
+
+ this.channel_participants
+ .insert(entry.channel_id, participants);
+ }
+
+ cx.notify();
+ })
+ }))
+ }
+}
@@ -0,0 +1,184 @@
+use crate::{Channel, ChannelId};
+use collections::BTreeMap;
+use rpc::proto;
+use std::sync::Arc;
+
+#[derive(Default, Debug)]
+pub struct ChannelIndex {
+ channels_ordered: Vec<ChannelId>,
+ channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
+}
+
+impl ChannelIndex {
+ pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
+ &self.channels_by_id
+ }
+
+ pub fn ordered_channels(&self) -> &[ChannelId] {
+ &self.channels_ordered
+ }
+
+ pub fn clear(&mut self) {
+ self.channels_ordered.clear();
+ self.channels_by_id.clear();
+ }
+
+ /// Delete the given channels from this index.
+ pub fn delete_channels(&mut self, channels: &[ChannelId]) {
+ self.channels_by_id
+ .retain(|channel_id, _| !channels.contains(channel_id));
+ self.channels_ordered
+ .retain(|channel_id| !channels.contains(channel_id));
+ }
+
+ pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
+ ChannelPathsInsertGuard {
+ channels_ordered: &mut self.channels_ordered,
+ channels_by_id: &mut self.channels_by_id,
+ }
+ }
+
+ pub fn acknowledge_note_version(
+ &mut self,
+ channel_id: ChannelId,
+ epoch: u64,
+ version: &clock::Global,
+ ) {
+ if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
+ let channel = Arc::make_mut(channel);
+ if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version {
+ if epoch > *unseen_epoch
+ || epoch == *unseen_epoch && version.observed_all(unseen_version)
+ {
+ channel.unseen_note_version = None;
+ }
+ }
+ }
+ }
+
+ pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) {
+ if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
+ let channel = Arc::make_mut(channel);
+ if let Some(unseen_message_id) = channel.unseen_message_id {
+ if message_id >= unseen_message_id {
+ channel.unseen_message_id = None;
+ }
+ }
+ }
+ }
+
+ pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
+ insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version);
+ }
+
+ pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) {
+ insert_new_message(&mut self.channels_by_id, channel_id, message_id)
+ }
+}
+
+/// A guard for ensuring that the paths index maintains its sort and uniqueness
+/// invariants after a series of insertions
+#[derive(Debug)]
+pub struct ChannelPathsInsertGuard<'a> {
+ channels_ordered: &'a mut Vec<ChannelId>,
+ channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
+}
+
+impl<'a> ChannelPathsInsertGuard<'a> {
+ pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
+ insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version);
+ }
+
+ pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) {
+ insert_new_message(&mut self.channels_by_id, channel_id, message_id)
+ }
+
+ pub fn insert(&mut self, channel_proto: proto::Channel) -> bool {
+ let mut ret = false;
+ if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
+ let existing_channel = Arc::make_mut(existing_channel);
+
+ ret = existing_channel.visibility != channel_proto.visibility()
+ || existing_channel.role != channel_proto.role()
+ || existing_channel.name != channel_proto.name;
+
+ existing_channel.visibility = channel_proto.visibility();
+ existing_channel.role = channel_proto.role();
+ existing_channel.name = channel_proto.name;
+ } else {
+ self.channels_by_id.insert(
+ channel_proto.id,
+ Arc::new(Channel {
+ id: channel_proto.id,
+ visibility: channel_proto.visibility(),
+ role: channel_proto.role(),
+ name: channel_proto.name,
+ unseen_note_version: None,
+ unseen_message_id: None,
+ parent_path: channel_proto.parent_path,
+ }),
+ );
+ self.insert_root(channel_proto.id);
+ }
+ ret
+ }
+
+ fn insert_root(&mut self, channel_id: ChannelId) {
+ self.channels_ordered.push(channel_id);
+ }
+}
+
+impl<'a> Drop for ChannelPathsInsertGuard<'a> {
+ fn drop(&mut self) {
+ self.channels_ordered.sort_by(|a, b| {
+ let a = channel_path_sorting_key(*a, &self.channels_by_id);
+ let b = channel_path_sorting_key(*b, &self.channels_by_id);
+ a.cmp(b)
+ });
+ self.channels_ordered.dedup();
+ }
+}
+
+fn channel_path_sorting_key<'a>(
+ id: ChannelId,
+ channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
+) -> impl Iterator<Item = &str> {
+ let (parent_path, name) = channels_by_id
+ .get(&id)
+ .map_or((&[] as &[_], None), |channel| {
+ (channel.parent_path.as_slice(), Some(channel.name.as_str()))
+ });
+ parent_path
+ .iter()
+ .filter_map(|id| Some(channels_by_id.get(id)?.name.as_str()))
+ .chain(name)
+}
+
+fn insert_note_changed(
+ channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
+ channel_id: u64,
+ epoch: u64,
+ version: &clock::Global,
+) {
+ if let Some(channel) = channels_by_id.get_mut(&channel_id) {
+ let unseen_version = Arc::make_mut(channel)
+ .unseen_note_version
+ .get_or_insert((0, clock::Global::new()));
+ if epoch > unseen_version.0 {
+ *unseen_version = (epoch, version.clone());
+ } else {
+ unseen_version.1.join(&version);
+ }
+ }
+}
+
+fn insert_new_message(
+ channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
+ channel_id: u64,
+ message_id: u64,
+) {
+ if let Some(channel) = channels_by_id.get_mut(&channel_id) {
+ let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0);
+ *unseen_message_id = message_id.max(*unseen_message_id);
+ }
+}
@@ -0,0 +1,380 @@
+use crate::channel_chat::ChannelChatEvent;
+
+use super::*;
+use client::{test::FakeServer, Client, UserStore};
+use gpui::{AppContext, Context, Model, TestAppContext};
+use rpc::proto::{self};
+use settings::SettingsStore;
+use util::http::FakeHttpClient;
+
+#[gpui::test]
+fn test_update_channels(cx: &mut AppContext) {
+ let channel_store = init_test(cx);
+
+ update_channels(
+ &channel_store,
+ proto::UpdateChannels {
+ channels: vec![
+ proto::Channel {
+ id: 1,
+ name: "b".to_string(),
+ visibility: proto::ChannelVisibility::Members as i32,
+ role: proto::ChannelRole::Admin.into(),
+ parent_path: Vec::new(),
+ },
+ proto::Channel {
+ id: 2,
+ name: "a".to_string(),
+ visibility: proto::ChannelVisibility::Members as i32,
+ role: proto::ChannelRole::Member.into(),
+ parent_path: Vec::new(),
+ },
+ ],
+ ..Default::default()
+ },
+ cx,
+ );
+ assert_channels(
+ &channel_store,
+ &[
+ //
+ (0, "a".to_string(), proto::ChannelRole::Member),
+ (0, "b".to_string(), proto::ChannelRole::Admin),
+ ],
+ cx,
+ );
+
+ update_channels(
+ &channel_store,
+ proto::UpdateChannels {
+ channels: vec![
+ proto::Channel {
+ id: 3,
+ name: "x".to_string(),
+ visibility: proto::ChannelVisibility::Members as i32,
+ role: proto::ChannelRole::Admin.into(),
+ parent_path: vec![1],
+ },
+ proto::Channel {
+ id: 4,
+ name: "y".to_string(),
+ visibility: proto::ChannelVisibility::Members as i32,
+ role: proto::ChannelRole::Member.into(),
+ parent_path: vec![2],
+ },
+ ],
+ ..Default::default()
+ },
+ cx,
+ );
+ assert_channels(
+ &channel_store,
+ &[
+ (0, "a".to_string(), proto::ChannelRole::Member),
+ (1, "y".to_string(), proto::ChannelRole::Member),
+ (0, "b".to_string(), proto::ChannelRole::Admin),
+ (1, "x".to_string(), proto::ChannelRole::Admin),
+ ],
+ cx,
+ );
+}
+
+#[gpui::test]
+fn test_dangling_channel_paths(cx: &mut AppContext) {
+ let channel_store = init_test(cx);
+
+ update_channels(
+ &channel_store,
+ proto::UpdateChannels {
+ channels: vec![
+ proto::Channel {
+ id: 0,
+ name: "a".to_string(),
+ visibility: proto::ChannelVisibility::Members as i32,
+ role: proto::ChannelRole::Admin.into(),
+ parent_path: vec![],
+ },
+ proto::Channel {
+ id: 1,
+ name: "b".to_string(),
+ visibility: proto::ChannelVisibility::Members as i32,
+ role: proto::ChannelRole::Admin.into(),
+ parent_path: vec![0],
+ },
+ proto::Channel {
+ id: 2,
+ name: "c".to_string(),
+ visibility: proto::ChannelVisibility::Members as i32,
+ role: proto::ChannelRole::Admin.into(),
+ parent_path: vec![0, 1],
+ },
+ ],
+ ..Default::default()
+ },
+ cx,
+ );
+ // Sanity check
+ assert_channels(
+ &channel_store,
+ &[
+ //
+ (0, "a".to_string(), proto::ChannelRole::Admin),
+ (1, "b".to_string(), proto::ChannelRole::Admin),
+ (2, "c".to_string(), proto::ChannelRole::Admin),
+ ],
+ cx,
+ );
+
+ update_channels(
+ &channel_store,
+ proto::UpdateChannels {
+ delete_channels: vec![1, 2],
+ ..Default::default()
+ },
+ cx,
+ );
+
+ // Make sure that the 1/2/3 path is gone
+ assert_channels(
+ &channel_store,
+ &[(0, "a".to_string(), proto::ChannelRole::Admin)],
+ cx,
+ );
+}
+
+#[gpui::test]
+async fn test_channel_messages(cx: &mut TestAppContext) {
+ let user_id = 5;
+ let channel_id = 5;
+ let channel_store = cx.update(init_test);
+ let client = channel_store.update(cx, |s, _| s.client());
+ let server = FakeServer::for_client(user_id, &client, cx).await;
+
+ // Get the available channels.
+ server.send(proto::UpdateChannels {
+ channels: vec![proto::Channel {
+ id: channel_id,
+ name: "the-channel".to_string(),
+ visibility: proto::ChannelVisibility::Members as i32,
+ role: proto::ChannelRole::Member.into(),
+ parent_path: vec![],
+ }],
+ ..Default::default()
+ });
+ cx.executor().run_until_parked();
+ cx.update(|cx| {
+ assert_channels(
+ &channel_store,
+ &[(0, "the-channel".to_string(), proto::ChannelRole::Member)],
+ cx,
+ );
+ });
+
+ let get_users = server.receive::<proto::GetUsers>().await.unwrap();
+ assert_eq!(get_users.payload.user_ids, vec![5]);
+ server.respond(
+ get_users.receipt(),
+ proto::UsersResponse {
+ users: vec![proto::User {
+ id: 5,
+ github_login: "nathansobo".into(),
+ avatar_url: "http://avatar.com/nathansobo".into(),
+ }],
+ },
+ );
+
+ // Join a channel and populate its existing messages.
+ let channel = channel_store.update(cx, |store, cx| {
+ let channel_id = store.ordered_channels().next().unwrap().1.id;
+ store.open_channel_chat(channel_id, cx)
+ });
+ let join_channel = server.receive::<proto::JoinChannelChat>().await.unwrap();
+ server.respond(
+ join_channel.receipt(),
+ proto::JoinChannelChatResponse {
+ messages: vec![
+ proto::ChannelMessage {
+ id: 10,
+ body: "a".into(),
+ timestamp: 1000,
+ sender_id: 5,
+ mentions: vec![],
+ nonce: Some(1.into()),
+ },
+ proto::ChannelMessage {
+ id: 11,
+ body: "b".into(),
+ timestamp: 1001,
+ sender_id: 6,
+ mentions: vec![],
+ nonce: Some(2.into()),
+ },
+ ],
+ done: false,
+ },
+ );
+
+ cx.executor().start_waiting();
+
+ // Client requests all users for the received messages
+ let mut get_users = server.receive::<proto::GetUsers>().await.unwrap();
+ get_users.payload.user_ids.sort();
+ assert_eq!(get_users.payload.user_ids, vec![6]);
+ server.respond(
+ get_users.receipt(),
+ proto::UsersResponse {
+ users: vec![proto::User {
+ id: 6,
+ github_login: "maxbrunsfeld".into(),
+ avatar_url: "http://avatar.com/maxbrunsfeld".into(),
+ }],
+ },
+ );
+
+ let channel = channel.await.unwrap();
+ channel.update(cx, |channel, _| {
+ assert_eq!(
+ channel
+ .messages_in_range(0..2)
+ .map(|message| (message.sender.github_login.clone(), message.body.clone()))
+ .collect::<Vec<_>>(),
+ &[
+ ("nathansobo".into(), "a".into()),
+ ("maxbrunsfeld".into(), "b".into())
+ ]
+ );
+ });
+
+ // Receive a new message.
+ server.send(proto::ChannelMessageSent {
+ channel_id,
+ message: Some(proto::ChannelMessage {
+ id: 12,
+ body: "c".into(),
+ timestamp: 1002,
+ sender_id: 7,
+ mentions: vec![],
+ nonce: Some(3.into()),
+ }),
+ });
+
+ // Client requests user for message since they haven't seen them yet
+ let get_users = server.receive::<proto::GetUsers>().await.unwrap();
+ assert_eq!(get_users.payload.user_ids, vec![7]);
+ server.respond(
+ get_users.receipt(),
+ proto::UsersResponse {
+ users: vec![proto::User {
+ id: 7,
+ github_login: "as-cii".into(),
+ avatar_url: "http://avatar.com/as-cii".into(),
+ }],
+ },
+ );
+
+ assert_eq!(
+ channel.next_event(cx),
+ ChannelChatEvent::MessagesUpdated {
+ old_range: 2..2,
+ new_count: 1,
+ }
+ );
+ channel.update(cx, |channel, _| {
+ assert_eq!(
+ channel
+ .messages_in_range(2..3)
+ .map(|message| (message.sender.github_login.clone(), message.body.clone()))
+ .collect::<Vec<_>>(),
+ &[("as-cii".into(), "c".into())]
+ )
+ });
+
+ // Scroll up to view older messages.
+ channel.update(cx, |channel, cx| {
+ channel.load_more_messages(cx).unwrap().detach();
+ });
+ let get_messages = server.receive::<proto::GetChannelMessages>().await.unwrap();
+ assert_eq!(get_messages.payload.channel_id, 5);
+ assert_eq!(get_messages.payload.before_message_id, 10);
+ server.respond(
+ get_messages.receipt(),
+ proto::GetChannelMessagesResponse {
+ done: true,
+ messages: vec![
+ proto::ChannelMessage {
+ id: 8,
+ body: "y".into(),
+ timestamp: 998,
+ sender_id: 5,
+ nonce: Some(4.into()),
+ mentions: vec![],
+ },
+ proto::ChannelMessage {
+ id: 9,
+ body: "z".into(),
+ timestamp: 999,
+ sender_id: 6,
+ nonce: Some(5.into()),
+ mentions: vec![],
+ },
+ ],
+ },
+ );
+
+ assert_eq!(
+ channel.next_event(cx),
+ ChannelChatEvent::MessagesUpdated {
+ old_range: 0..0,
+ new_count: 2,
+ }
+ );
+ channel.update(cx, |channel, _| {
+ assert_eq!(
+ channel
+ .messages_in_range(0..2)
+ .map(|message| (message.sender.github_login.clone(), message.body.clone()))
+ .collect::<Vec<_>>(),
+ &[
+ ("nathansobo".into(), "y".into()),
+ ("maxbrunsfeld".into(), "z".into())
+ ]
+ );
+ });
+}
+
+fn init_test(cx: &mut AppContext) -> Model<ChannelStore> {
+ let http = FakeHttpClient::with_404_response();
+ let client = Client::new(http.clone(), cx);
+ let user_store = cx.build_model(|cx| UserStore::new(client.clone(), http, cx));
+
+ let settings_store = SettingsStore::test(cx);
+ cx.set_global(settings_store);
+ client::init(&client, cx);
+ crate::init(&client, user_store, cx);
+
+ ChannelStore::global(cx)
+}
+
+fn update_channels(
+ channel_store: &Model<ChannelStore>,
+ message: proto::UpdateChannels,
+ cx: &mut AppContext,
+) {
+ let task = channel_store.update(cx, |store, cx| store.update_channels(message, cx));
+ assert!(task.is_none());
+}
+
+#[track_caller]
+fn assert_channels(
+ channel_store: &Model<ChannelStore>,
+ expected_channels: &[(usize, String, proto::ChannelRole)],
+ cx: &mut AppContext,
+) {
+ let actual = channel_store.update(cx, |store, _| {
+ store
+ .ordered_channels()
+ .map(|(depth, channel)| (depth, channel.name.to_string(), channel.role))
+ .collect::<Vec<_>>()
+ });
+ assert_eq!(actual, expected_channels);
+}
@@ -292,22 +292,18 @@ impl UserStore {
.upgrade()
.ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
for contact in message.contacts {
- let should_notify = contact.should_notify;
- updated_contacts.push((
- Arc::new(Contact::from_proto(contact, &this, &mut cx).await?),
- should_notify,
+ updated_contacts.push(Arc::new(
+ Contact::from_proto(contact, &this, &mut cx).await?,
));
}
let mut incoming_requests = Vec::new();
for request in message.incoming_requests {
incoming_requests.push({
- let user = this
- .update(&mut cx, |this, cx| {
- this.get_user(request.requester_id, cx)
- })?
- .await?;
- (user, request.should_notify)
+ this.update(&mut cx, |this, cx| {
+ this.get_user(request.requester_id, cx)
+ })?
+ .await?
});
}
@@ -331,13 +327,7 @@ impl UserStore {
this.contacts
.retain(|contact| !removed_contacts.contains(&contact.user.id));
// Update existing contacts and insert new ones
- for (updated_contact, should_notify) in updated_contacts {
- if should_notify {
- cx.emit(Event::Contact {
- user: updated_contact.user.clone(),
- kind: ContactEventKind::Accepted,
- });
- }
+ for updated_contact in updated_contacts {
match this.contacts.binary_search_by_key(
&&updated_contact.user.github_login,
|contact| &contact.user.github_login,
@@ -360,14 +350,7 @@ impl UserStore {
}
});
// Update existing incoming requests and insert new ones
- for (user, should_notify) in incoming_requests {
- if should_notify {
- cx.emit(Event::Contact {
- user: user.clone(),
- kind: ContactEventKind::Requested,
- });
- }
-
+ for user in incoming_requests {
match this
.incoming_contact_requests
.binary_search_by_key(&&user.github_login, |contact| {
@@ -46,13 +46,17 @@ pub struct AppCell {
}
impl AppCell {
+ #[track_caller]
pub fn borrow(&self) -> AppRef {
+ let thread_id = std::thread::current().id();
+ eprintln!("borrowed {thread_id:?}");
AppRef(self.app.borrow())
}
+ #[track_caller]
pub fn borrow_mut(&self) -> AppRefMut {
- // let thread_id = std::thread::current().id();
- // dbg!("borrowed {thread_id:?}");
+ let thread_id = std::thread::current().id();
+ eprintln!("borrowed {thread_id:?}");
AppRefMut(self.app.borrow_mut())
}
}
@@ -189,3 +189,22 @@ impl TestAppContext {
.unwrap();
}
}
+
+impl<T: Send + EventEmitter> Model<T> {
+ pub fn next_event(&self, cx: &mut TestAppContext) -> T::Event
+ where
+ T::Event: Send + Clone,
+ {
+ let (tx, mut rx) = futures::channel::mpsc::unbounded();
+ let _subscription = self.update(cx, |_, cx| {
+ cx.subscribe(self, move |_, _, event, _| {
+ tx.unbounded_send(event.clone()).ok();
+ })
+ });
+
+ cx.executor().run_until_parked();
+ rx.try_next()
+ .expect("no event received")
+ .expect("model was dropped")
+ }
+}
@@ -175,6 +175,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
inner_fn_args.extend(quote!(&mut #cx_varname_lock,));
cx_teardowns.extend(quote!(
#cx_varname_lock.quit();
+ drop(#cx_varname_lock);
dispatcher.run_until_parked();
));
continue;
@@ -89,88 +89,96 @@ message Envelope {
FormatBuffersResponse format_buffers_response = 70;
GetCompletions get_completions = 71;
GetCompletionsResponse get_completions_response = 72;
- ApplyCompletionAdditionalEdits apply_completion_additional_edits = 73;
- ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 74;
- GetCodeActions get_code_actions = 75;
- GetCodeActionsResponse get_code_actions_response = 76;
- GetHover get_hover = 77;
- GetHoverResponse get_hover_response = 78;
- ApplyCodeAction apply_code_action = 79;
- ApplyCodeActionResponse apply_code_action_response = 80;
- PrepareRename prepare_rename = 81;
- PrepareRenameResponse prepare_rename_response = 82;
- PerformRename perform_rename = 83;
- PerformRenameResponse perform_rename_response = 84;
- SearchProject search_project = 85;
- SearchProjectResponse search_project_response = 86;
-
- UpdateContacts update_contacts = 87;
- UpdateInviteInfo update_invite_info = 88;
- ShowContacts show_contacts = 89;
-
- GetUsers get_users = 90;
- FuzzySearchUsers fuzzy_search_users = 91;
- UsersResponse users_response = 92;
- RequestContact request_contact = 93;
- RespondToContactRequest respond_to_contact_request = 94;
- RemoveContact remove_contact = 95;
-
- Follow follow = 96;
- FollowResponse follow_response = 97;
- UpdateFollowers update_followers = 98;
- Unfollow unfollow = 99;
- GetPrivateUserInfo get_private_user_info = 100;
- GetPrivateUserInfoResponse get_private_user_info_response = 101;
- UpdateDiffBase update_diff_base = 102;
-
- OnTypeFormatting on_type_formatting = 103;
- OnTypeFormattingResponse on_type_formatting_response = 104;
-
- UpdateWorktreeSettings update_worktree_settings = 105;
-
- InlayHints inlay_hints = 106;
- InlayHintsResponse inlay_hints_response = 107;
- ResolveInlayHint resolve_inlay_hint = 108;
- ResolveInlayHintResponse resolve_inlay_hint_response = 109;
- RefreshInlayHints refresh_inlay_hints = 110;
-
- CreateChannel create_channel = 111;
- CreateChannelResponse create_channel_response = 112;
- InviteChannelMember invite_channel_member = 113;
- RemoveChannelMember remove_channel_member = 114;
- RespondToChannelInvite respond_to_channel_invite = 115;
- UpdateChannels update_channels = 116;
- JoinChannel join_channel = 117;
- DeleteChannel delete_channel = 118;
- GetChannelMembers get_channel_members = 119;
- GetChannelMembersResponse get_channel_members_response = 120;
- SetChannelMemberAdmin set_channel_member_admin = 121;
- RenameChannel rename_channel = 122;
- RenameChannelResponse rename_channel_response = 123;
-
- JoinChannelBuffer join_channel_buffer = 124;
- JoinChannelBufferResponse join_channel_buffer_response = 125;
- UpdateChannelBuffer update_channel_buffer = 126;
- LeaveChannelBuffer leave_channel_buffer = 127;
- UpdateChannelBufferCollaborators update_channel_buffer_collaborators = 128;
- RejoinChannelBuffers rejoin_channel_buffers = 129;
- RejoinChannelBuffersResponse rejoin_channel_buffers_response = 130;
- AckBufferOperation ack_buffer_operation = 143;
-
- JoinChannelChat join_channel_chat = 131;
- JoinChannelChatResponse join_channel_chat_response = 132;
- LeaveChannelChat leave_channel_chat = 133;
- SendChannelMessage send_channel_message = 134;
- SendChannelMessageResponse send_channel_message_response = 135;
- ChannelMessageSent channel_message_sent = 136;
- GetChannelMessages get_channel_messages = 137;
- GetChannelMessagesResponse get_channel_messages_response = 138;
- RemoveChannelMessage remove_channel_message = 139;
- AckChannelMessage ack_channel_message = 144;
-
- LinkChannel link_channel = 140;
- UnlinkChannel unlink_channel = 141;
- MoveChannel move_channel = 142; // current max: 144
+ ResolveCompletionDocumentation resolve_completion_documentation = 73;
+ ResolveCompletionDocumentationResponse resolve_completion_documentation_response = 74;
+ ApplyCompletionAdditionalEdits apply_completion_additional_edits = 75;
+ ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 76;
+ GetCodeActions get_code_actions = 77;
+ GetCodeActionsResponse get_code_actions_response = 78;
+ GetHover get_hover = 79;
+ GetHoverResponse get_hover_response = 80;
+ ApplyCodeAction apply_code_action = 81;
+ ApplyCodeActionResponse apply_code_action_response = 82;
+ PrepareRename prepare_rename = 83;
+ PrepareRenameResponse prepare_rename_response = 84;
+ PerformRename perform_rename = 85;
+ PerformRenameResponse perform_rename_response = 86;
+ SearchProject search_project = 87;
+ SearchProjectResponse search_project_response = 88;
+
+ UpdateContacts update_contacts = 89;
+ UpdateInviteInfo update_invite_info = 90;
+ ShowContacts show_contacts = 91;
+
+ GetUsers get_users = 92;
+ FuzzySearchUsers fuzzy_search_users = 93;
+ UsersResponse users_response = 94;
+ RequestContact request_contact = 95;
+ RespondToContactRequest respond_to_contact_request = 96;
+ RemoveContact remove_contact = 97;
+
+ Follow follow = 98;
+ FollowResponse follow_response = 99;
+ UpdateFollowers update_followers = 100;
+ Unfollow unfollow = 101;
+ GetPrivateUserInfo get_private_user_info = 102;
+ GetPrivateUserInfoResponse get_private_user_info_response = 103;
+ UpdateDiffBase update_diff_base = 104;
+
+ OnTypeFormatting on_type_formatting = 105;
+ OnTypeFormattingResponse on_type_formatting_response = 106;
+
+ UpdateWorktreeSettings update_worktree_settings = 107;
+
+ InlayHints inlay_hints = 108;
+ InlayHintsResponse inlay_hints_response = 109;
+ ResolveInlayHint resolve_inlay_hint = 110;
+ ResolveInlayHintResponse resolve_inlay_hint_response = 111;
+ RefreshInlayHints refresh_inlay_hints = 112;
+
+ CreateChannel create_channel = 113;
+ CreateChannelResponse create_channel_response = 114;
+ InviteChannelMember invite_channel_member = 115;
+ RemoveChannelMember remove_channel_member = 116;
+ RespondToChannelInvite respond_to_channel_invite = 117;
+ UpdateChannels update_channels = 118;
+ JoinChannel join_channel = 119;
+ DeleteChannel delete_channel = 120;
+ GetChannelMembers get_channel_members = 121;
+ GetChannelMembersResponse get_channel_members_response = 122;
+ SetChannelMemberRole set_channel_member_role = 123;
+ RenameChannel rename_channel = 124;
+ RenameChannelResponse rename_channel_response = 125;
+
+ JoinChannelBuffer join_channel_buffer = 126;
+ JoinChannelBufferResponse join_channel_buffer_response = 127;
+ UpdateChannelBuffer update_channel_buffer = 128;
+ LeaveChannelBuffer leave_channel_buffer = 129;
+ UpdateChannelBufferCollaborators update_channel_buffer_collaborators = 130;
+ RejoinChannelBuffers rejoin_channel_buffers = 131;
+ RejoinChannelBuffersResponse rejoin_channel_buffers_response = 132;
+ AckBufferOperation ack_buffer_operation = 133;
+
+ JoinChannelChat join_channel_chat = 134;
+ JoinChannelChatResponse join_channel_chat_response = 135;
+ LeaveChannelChat leave_channel_chat = 136;
+ SendChannelMessage send_channel_message = 137;
+ SendChannelMessageResponse send_channel_message_response = 138;
+ ChannelMessageSent channel_message_sent = 139;
+ GetChannelMessages get_channel_messages = 140;
+ GetChannelMessagesResponse get_channel_messages_response = 141;
+ RemoveChannelMessage remove_channel_message = 142;
+ AckChannelMessage ack_channel_message = 143;
+ GetChannelMessagesById get_channel_messages_by_id = 144;
+
+ MoveChannel move_channel = 147;
+ SetChannelVisibility set_channel_visibility = 148;
+
+ AddNotification add_notification = 149;
+ GetNotifications get_notifications = 150;
+ GetNotificationsResponse get_notifications_response = 151;
+ DeleteNotification delete_notification = 152;
+ MarkNotificationRead mark_notification_read = 153; // Current max
}
}
@@ -332,6 +340,7 @@ message RoomUpdated {
message LiveKitConnectionInfo {
string server_url = 1;
string token = 2;
+ bool can_publish = 3;
}
message ShareProject {
@@ -832,6 +841,17 @@ message ResolveState {
}
}
+message ResolveCompletionDocumentation {
+ uint64 project_id = 1;
+ uint64 language_server_id = 2;
+ bytes lsp_completion = 3;
+}
+
+message ResolveCompletionDocumentationResponse {
+ string text = 1;
+ bool is_markdown = 2;
+}
+
message ResolveInlayHint {
uint64 project_id = 1;
uint64 buffer_id = 2;
@@ -950,13 +970,10 @@ message LspDiskBasedDiagnosticsUpdated {}
message UpdateChannels {
repeated Channel channels = 1;
- repeated ChannelEdge insert_edge = 2;
- repeated ChannelEdge delete_edge = 3;
repeated uint64 delete_channels = 4;
repeated Channel channel_invitations = 5;
repeated uint64 remove_channel_invitations = 6;
repeated ChannelParticipants channel_participants = 7;
- repeated ChannelPermission channel_permissions = 8;
repeated UnseenChannelMessage unseen_channel_messages = 9;
repeated UnseenChannelBufferChange unseen_channel_buffer_changes = 10;
}
@@ -972,14 +989,9 @@ message UnseenChannelBufferChange {
repeated VectorClockEntry version = 3;
}
-message ChannelEdge {
- uint64 channel_id = 1;
- uint64 parent_id = 2;
-}
-
message ChannelPermission {
uint64 channel_id = 1;
- bool is_admin = 2;
+ ChannelRole role = 3;
}
message ChannelParticipants {
@@ -1005,8 +1017,8 @@ message GetChannelMembersResponse {
message ChannelMember {
uint64 user_id = 1;
- bool admin = 2;
Kind kind = 3;
+ ChannelRole role = 4;
enum Kind {
Member = 0;
@@ -1028,7 +1040,7 @@ message CreateChannelResponse {
message InviteChannelMember {
uint64 channel_id = 1;
uint64 user_id = 2;
- bool admin = 3;
+ ChannelRole role = 4;
}
message RemoveChannelMember {
@@ -1036,10 +1048,22 @@ message RemoveChannelMember {
uint64 user_id = 2;
}
-message SetChannelMemberAdmin {
+enum ChannelRole {
+ Admin = 0;
+ Member = 1;
+ Guest = 2;
+ Banned = 3;
+}
+
+message SetChannelMemberRole {
uint64 channel_id = 1;
uint64 user_id = 2;
- bool admin = 3;
+ ChannelRole role = 3;
+}
+
+message SetChannelVisibility {
+ uint64 channel_id = 1;
+ ChannelVisibility visibility = 2;
}
message RenameChannel {
@@ -1068,6 +1092,7 @@ message SendChannelMessage {
uint64 channel_id = 1;
string body = 2;
Nonce nonce = 3;
+ repeated ChatMention mentions = 4;
}
message RemoveChannelMessage {
@@ -1099,20 +1124,13 @@ message GetChannelMessagesResponse {
bool done = 2;
}
-message LinkChannel {
- uint64 channel_id = 1;
- uint64 to = 2;
-}
-
-message UnlinkChannel {
- uint64 channel_id = 1;
- uint64 from = 2;
+message GetChannelMessagesById {
+ repeated uint64 message_ids = 1;
}
message MoveChannel {
uint64 channel_id = 1;
- uint64 from = 2;
- uint64 to = 3;
+ optional uint64 to = 2;
}
message JoinChannelBuffer {
@@ -1125,6 +1143,12 @@ message ChannelMessage {
uint64 timestamp = 3;
uint64 sender_id = 4;
Nonce nonce = 5;
+ repeated ChatMention mentions = 6;
+}
+
+message ChatMention {
+ Range range = 1;
+ uint64 user_id = 2;
}
message RejoinChannelBuffers {
@@ -1216,7 +1240,6 @@ message ShowContacts {}
message IncomingContactRequest {
uint64 requester_id = 1;
- bool should_notify = 2;
}
message UpdateDiagnostics {
@@ -1533,16 +1556,23 @@ message Nonce {
uint64 lower_half = 2;
}
+enum ChannelVisibility {
+ Public = 0;
+ Members = 1;
+}
+
message Channel {
uint64 id = 1;
string name = 2;
+ ChannelVisibility visibility = 3;
+ ChannelRole role = 4;
+ repeated uint64 parent_path = 5;
}
message Contact {
uint64 user_id = 1;
bool online = 2;
bool busy = 3;
- bool should_notify = 4;
}
message WorktreeMetadata {
@@ -1557,3 +1587,34 @@ message UpdateDiffBase {
uint64 buffer_id = 2;
optional string diff_base = 3;
}
+
+message GetNotifications {
+ optional uint64 before_id = 1;
+}
+
+message AddNotification {
+ Notification notification = 1;
+}
+
+message GetNotificationsResponse {
+ repeated Notification notifications = 1;
+ bool done = 2;
+}
+
+message DeleteNotification {
+ uint64 notification_id = 1;
+}
+
+message MarkNotificationRead {
+ uint64 notification_id = 1;
+}
+
+message Notification {
+ uint64 id = 1;
+ uint64 timestamp = 2;
+ string kind = 3;
+ optional uint64 entity_id = 4;
+ string content = 5;
+ bool is_read = 6;
+ optional bool response = 7;
+}
@@ -133,6 +133,9 @@ impl fmt::Display for PeerId {
messages!(
(Ack, Foreground),
+ (AckBufferOperation, Background),
+ (AckChannelMessage, Background),
+ (AddNotification, Foreground),
(AddProjectCollaborator, Foreground),
(ApplyCodeAction, Background),
(ApplyCodeActionResponse, Background),
@@ -143,57 +146,74 @@ messages!(
(Call, Foreground),
(CallCanceled, Foreground),
(CancelCall, Foreground),
+ (ChannelMessageSent, Foreground),
(CopyProjectEntry, Foreground),
(CreateBufferForPeer, Foreground),
(CreateChannel, Foreground),
(CreateChannelResponse, Foreground),
- (ChannelMessageSent, Foreground),
(CreateProjectEntry, Foreground),
(CreateRoom, Foreground),
(CreateRoomResponse, Foreground),
(DeclineCall, Foreground),
+ (DeleteChannel, Foreground),
+ (DeleteNotification, Foreground),
(DeleteProjectEntry, Foreground),
(Error, Foreground),
(ExpandProjectEntry, Foreground),
+ (ExpandProjectEntryResponse, Foreground),
(Follow, Foreground),
(FollowResponse, Foreground),
(FormatBuffers, Foreground),
(FormatBuffersResponse, Foreground),
(FuzzySearchUsers, Foreground),
- (GetCodeActions, Background),
- (GetCodeActionsResponse, Background),
- (GetHover, Background),
- (GetHoverResponse, Background),
+ (GetChannelMembers, Foreground),
+ (GetChannelMembersResponse, Foreground),
(GetChannelMessages, Background),
+ (GetChannelMessagesById, Background),
(GetChannelMessagesResponse, Background),
- (SendChannelMessage, Background),
- (SendChannelMessageResponse, Background),
+ (GetCodeActions, Background),
+ (GetCodeActionsResponse, Background),
(GetCompletions, Background),
(GetCompletionsResponse, Background),
(GetDefinition, Background),
(GetDefinitionResponse, Background),
- (GetTypeDefinition, Background),
- (GetTypeDefinitionResponse, Background),
(GetDocumentHighlights, Background),
(GetDocumentHighlightsResponse, Background),
- (GetReferences, Background),
- (GetReferencesResponse, Background),
+ (GetHover, Background),
+ (GetHoverResponse, Background),
+ (GetNotifications, Foreground),
+ (GetNotificationsResponse, Foreground),
+ (GetPrivateUserInfo, Foreground),
+ (GetPrivateUserInfoResponse, Foreground),
(GetProjectSymbols, Background),
(GetProjectSymbolsResponse, Background),
+ (GetReferences, Background),
+ (GetReferencesResponse, Background),
+ (GetTypeDefinition, Background),
+ (GetTypeDefinitionResponse, Background),
(GetUsers, Foreground),
(Hello, Foreground),
(IncomingCall, Foreground),
+ (InlayHints, Background),
+ (InlayHintsResponse, Background),
(InviteChannelMember, Foreground),
- (UsersResponse, Foreground),
+ (JoinChannel, Foreground),
+ (JoinChannelBuffer, Foreground),
+ (JoinChannelBufferResponse, Foreground),
+ (JoinChannelChat, Foreground),
+ (JoinChannelChatResponse, Foreground),
(JoinProject, Foreground),
(JoinProjectResponse, Foreground),
(JoinRoom, Foreground),
(JoinRoomResponse, Foreground),
- (JoinChannelChat, Foreground),
- (JoinChannelChatResponse, Foreground),
+ (LeaveChannelBuffer, Background),
(LeaveChannelChat, Foreground),
(LeaveProject, Foreground),
(LeaveRoom, Foreground),
+ (MarkNotificationRead, Foreground),
+ (MoveChannel, Foreground),
+ (OnTypeFormatting, Background),
+ (OnTypeFormattingResponse, Background),
(OpenBufferById, Background),
(OpenBufferByPath, Background),
(OpenBufferForSymbol, Background),
@@ -201,58 +221,56 @@ messages!(
(OpenBufferResponse, Background),
(PerformRename, Background),
(PerformRenameResponse, Background),
- (OnTypeFormatting, Background),
- (OnTypeFormattingResponse, Background),
- (InlayHints, Background),
- (InlayHintsResponse, Background),
- (ResolveInlayHint, Background),
- (ResolveInlayHintResponse, Background),
- (RefreshInlayHints, Foreground),
(Ping, Foreground),
(PrepareRename, Background),
(PrepareRenameResponse, Background),
- (ExpandProjectEntryResponse, Foreground),
(ProjectEntryResponse, Foreground),
+ (RefreshInlayHints, Foreground),
+ (RejoinChannelBuffers, Foreground),
+ (RejoinChannelBuffersResponse, Foreground),
(RejoinRoom, Foreground),
(RejoinRoomResponse, Foreground),
- (RemoveContact, Foreground),
- (RemoveChannelMember, Foreground),
- (RemoveChannelMessage, Foreground),
(ReloadBuffers, Foreground),
(ReloadBuffersResponse, Foreground),
+ (RemoveChannelMember, Foreground),
+ (RemoveChannelMessage, Foreground),
+ (RemoveContact, Foreground),
(RemoveProjectCollaborator, Foreground),
+ (RenameChannel, Foreground),
+ (RenameChannelResponse, Foreground),
(RenameProjectEntry, Foreground),
(RequestContact, Foreground),
- (RespondToContactRequest, Foreground),
+ (ResolveCompletionDocumentation, Background),
+ (ResolveCompletionDocumentationResponse, Background),
+ (ResolveInlayHint, Background),
+ (ResolveInlayHintResponse, Background),
(RespondToChannelInvite, Foreground),
- (JoinChannel, Foreground),
+ (RespondToContactRequest, Foreground),
(RoomUpdated, Foreground),
(SaveBuffer, Foreground),
- (RenameChannel, Foreground),
- (RenameChannelResponse, Foreground),
- (SetChannelMemberAdmin, Foreground),
+ (SetChannelMemberRole, Foreground),
+ (SetChannelVisibility, Foreground),
(SearchProject, Background),
(SearchProjectResponse, Background),
+ (SendChannelMessage, Background),
+ (SendChannelMessageResponse, Background),
(ShareProject, Foreground),
(ShareProjectResponse, Foreground),
(ShowContacts, Foreground),
(StartLanguageServer, Foreground),
(SynchronizeBuffers, Foreground),
(SynchronizeBuffersResponse, Foreground),
- (RejoinChannelBuffers, Foreground),
- (RejoinChannelBuffersResponse, Foreground),
(Test, Foreground),
(Unfollow, Foreground),
(UnshareProject, Foreground),
(UpdateBuffer, Foreground),
(UpdateBufferFile, Foreground),
- (UpdateContacts, Foreground),
- (DeleteChannel, Foreground),
- (MoveChannel, Foreground),
- (LinkChannel, Foreground),
- (UnlinkChannel, Foreground),
+ (UpdateChannelBuffer, Foreground),
+ (UpdateChannelBufferCollaborators, Foreground),
(UpdateChannels, Foreground),
+ (UpdateContacts, Foreground),
(UpdateDiagnosticSummary, Foreground),
+ (UpdateDiffBase, Foreground),
(UpdateFollowers, Foreground),
(UpdateInviteInfo, Foreground),
(UpdateLanguageServer, Foreground),
@@ -261,18 +279,7 @@ messages!(
(UpdateProjectCollaborator, Foreground),
(UpdateWorktree, Foreground),
(UpdateWorktreeSettings, Foreground),
- (UpdateDiffBase, Foreground),
- (GetPrivateUserInfo, Foreground),
- (GetPrivateUserInfoResponse, Foreground),
- (GetChannelMembers, Foreground),
- (GetChannelMembersResponse, Foreground),
- (JoinChannelBuffer, Foreground),
- (JoinChannelBufferResponse, Foreground),
- (LeaveChannelBuffer, Background),
- (UpdateChannelBuffer, Foreground),
- (UpdateChannelBufferCollaborators, Foreground),
- (AckBufferOperation, Background),
- (AckChannelMessage, Background),
+ (UsersResponse, Foreground),
);
request_messages!(
@@ -284,72 +291,78 @@ request_messages!(
(Call, Ack),
(CancelCall, Ack),
(CopyProjectEntry, ProjectEntryResponse),
+ (CreateChannel, CreateChannelResponse),
(CreateProjectEntry, ProjectEntryResponse),
(CreateRoom, CreateRoomResponse),
- (CreateChannel, CreateChannelResponse),
(DeclineCall, Ack),
+ (DeleteChannel, Ack),
(DeleteProjectEntry, ProjectEntryResponse),
(ExpandProjectEntry, ExpandProjectEntryResponse),
(Follow, FollowResponse),
(FormatBuffers, FormatBuffersResponse),
+ (FuzzySearchUsers, UsersResponse),
+ (GetChannelMembers, GetChannelMembersResponse),
+ (GetChannelMessages, GetChannelMessagesResponse),
+ (GetChannelMessagesById, GetChannelMessagesResponse),
(GetCodeActions, GetCodeActionsResponse),
- (GetHover, GetHoverResponse),
(GetCompletions, GetCompletionsResponse),
(GetDefinition, GetDefinitionResponse),
- (GetTypeDefinition, GetTypeDefinitionResponse),
(GetDocumentHighlights, GetDocumentHighlightsResponse),
- (GetReferences, GetReferencesResponse),
+ (GetHover, GetHoverResponse),
+ (GetNotifications, GetNotificationsResponse),
(GetPrivateUserInfo, GetPrivateUserInfoResponse),
(GetProjectSymbols, GetProjectSymbolsResponse),
- (FuzzySearchUsers, UsersResponse),
+ (GetReferences, GetReferencesResponse),
+ (GetTypeDefinition, GetTypeDefinitionResponse),
(GetUsers, UsersResponse),
+ (IncomingCall, Ack),
+ (InlayHints, InlayHintsResponse),
(InviteChannelMember, Ack),
+ (JoinChannel, JoinRoomResponse),
+ (JoinChannelBuffer, JoinChannelBufferResponse),
+ (JoinChannelChat, JoinChannelChatResponse),
(JoinProject, JoinProjectResponse),
(JoinRoom, JoinRoomResponse),
- (JoinChannelChat, JoinChannelChatResponse),
+ (LeaveChannelBuffer, Ack),
(LeaveRoom, Ack),
- (RejoinRoom, RejoinRoomResponse),
- (IncomingCall, Ack),
+ (MarkNotificationRead, Ack),
+ (MoveChannel, Ack),
+ (OnTypeFormatting, OnTypeFormattingResponse),
(OpenBufferById, OpenBufferResponse),
(OpenBufferByPath, OpenBufferResponse),
(OpenBufferForSymbol, OpenBufferForSymbolResponse),
- (Ping, Ack),
(PerformRename, PerformRenameResponse),
+ (Ping, Ack),
(PrepareRename, PrepareRenameResponse),
- (OnTypeFormatting, OnTypeFormattingResponse),
- (InlayHints, InlayHintsResponse),
- (ResolveInlayHint, ResolveInlayHintResponse),
(RefreshInlayHints, Ack),
+ (RejoinChannelBuffers, RejoinChannelBuffersResponse),
+ (RejoinRoom, RejoinRoomResponse),
(ReloadBuffers, ReloadBuffersResponse),
- (RequestContact, Ack),
(RemoveChannelMember, Ack),
- (RemoveContact, Ack),
- (RespondToContactRequest, Ack),
- (RespondToChannelInvite, Ack),
- (SetChannelMemberAdmin, Ack),
- (SendChannelMessage, SendChannelMessageResponse),
- (GetChannelMessages, GetChannelMessagesResponse),
- (GetChannelMembers, GetChannelMembersResponse),
- (JoinChannel, JoinRoomResponse),
(RemoveChannelMessage, Ack),
- (DeleteChannel, Ack),
- (RenameProjectEntry, ProjectEntryResponse),
+ (RemoveContact, Ack),
(RenameChannel, RenameChannelResponse),
- (LinkChannel, Ack),
- (UnlinkChannel, Ack),
- (MoveChannel, Ack),
+ (RenameProjectEntry, ProjectEntryResponse),
+ (RequestContact, Ack),
+ (
+ ResolveCompletionDocumentation,
+ ResolveCompletionDocumentationResponse
+ ),
+ (ResolveInlayHint, ResolveInlayHintResponse),
+ (RespondToChannelInvite, Ack),
+ (RespondToContactRequest, Ack),
(SaveBuffer, BufferSaved),
(SearchProject, SearchProjectResponse),
+ (SendChannelMessage, SendChannelMessageResponse),
+ (SetChannelMemberRole, Ack),
+ (SetChannelVisibility, Ack),
(ShareProject, ShareProjectResponse),
(SynchronizeBuffers, SynchronizeBuffersResponse),
- (RejoinChannelBuffers, RejoinChannelBuffersResponse),
(Test, Test),
(UpdateBuffer, Ack),
(UpdateParticipantLocation, Ack),
(UpdateProject, Ack),
(UpdateWorktree, Ack),
- (JoinChannelBuffer, JoinChannelBufferResponse),
- (LeaveChannelBuffer, Ack)
);
entity_messages!(
@@ -368,25 +381,26 @@ entity_messages!(
GetCodeActions,
GetCompletions,
GetDefinition,
- GetTypeDefinition,
GetDocumentHighlights,
GetHover,
- GetReferences,
GetProjectSymbols,
+ GetReferences,
+ GetTypeDefinition,
+ InlayHints,
JoinProject,
LeaveProject,
+ OnTypeFormatting,
OpenBufferById,
OpenBufferByPath,
OpenBufferForSymbol,
PerformRename,
- OnTypeFormatting,
- InlayHints,
- ResolveInlayHint,
- RefreshInlayHints,
PrepareRename,
+ RefreshInlayHints,
ReloadBuffers,
RemoveProjectCollaborator,
RenameProjectEntry,
+ ResolveCompletionDocumentation,
+ ResolveInlayHint,
SaveBuffer,
SearchProject,
StartLanguageServer,
@@ -395,19 +409,19 @@ entity_messages!(
UpdateBuffer,
UpdateBufferFile,
UpdateDiagnosticSummary,
+ UpdateDiffBase,
UpdateLanguageServer,
UpdateProject,
UpdateProjectCollaborator,
UpdateWorktree,
UpdateWorktreeSettings,
- UpdateDiffBase
);
entity_messages!(
channel_id,
ChannelMessageSent,
- UpdateChannelBuffer,
RemoveChannelMessage,
+ UpdateChannelBuffer,
UpdateChannelBufferCollaborators,
);