diff --git a/Cargo.lock b/Cargo.lock
index 2f549c568dd011e19f70480b29f079ec5794388a..829ed18bbbf8334364902ce382354ab615922594 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1554,6 +1554,7 @@ dependencies = [
"settings",
"theme",
"theme_selector",
+ "time 0.3.27",
"util",
"vcs_menu",
"workspace",
diff --git a/assets/icons/user_group_16.svg b/assets/icons/user_group_16.svg
new file mode 100644
index 0000000000000000000000000000000000000000..aa99277646653c899ee049547e5574b76b25b840
--- /dev/null
+++ b/assets/icons/user_group_16.svg
@@ -0,0 +1,3 @@
+
diff --git a/assets/settings/default.json b/assets/settings/default.json
index 6739819e713f38f9d0628eaf061bdd2ff509da69..86def54d323aebc0225fb5c89ee8a7a104c50a40 100644
--- a/assets/settings/default.json
+++ b/assets/settings/default.json
@@ -131,6 +131,14 @@
// Default width of the channels panel.
"default_width": 240
},
+ "chat_panel": {
+ // Whether to show the collaboration panel button in the status bar.
+ "button": true,
+ // Where to dock channels panel. Can be 'left' or 'right'.
+ "dock": "right",
+ // Default width of the channels panel.
+ "default_width": 240
+ },
"assistant": {
// Whether to show the assistant panel button in the status bar.
"button": true,
diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs
index cc7445dbcc74ff620968c9ff5a2a99686bd800d9..e7899ab2d8ebf48da7d4bddbb7eb62d49a7a7750 100644
--- a/crates/call/src/room.rs
+++ b/crates/call/src/room.rs
@@ -172,7 +172,7 @@ impl Room {
cx.spawn(|this, mut cx| async move {
connect.await?;
- if !cx.read(|cx| settings::get::(cx).mute_on_join) {
+ if !cx.read(Self::mute_on_join) {
this.update(&mut cx, |this, cx| this.share_microphone(cx))
.await?;
}
@@ -301,6 +301,10 @@ impl Room {
})
}
+ pub fn mute_on_join(cx: &AppContext) -> bool {
+ settings::get::(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
+ }
+
fn from_join_response(
response: proto::JoinRoomResponse,
client: Arc,
@@ -1124,7 +1128,7 @@ impl Room {
self.live_kit
.as_ref()
.and_then(|live_kit| match &live_kit.microphone_track {
- LocalTrack::None => Some(settings::get::(cx).mute_on_join),
+ LocalTrack::None => Some(Self::mute_on_join(cx)),
LocalTrack::Pending { muted, .. } => Some(*muted),
LocalTrack::Published { muted, .. } => Some(*muted),
})
diff --git a/crates/channel/Cargo.toml b/crates/channel/Cargo.toml
index c2191fdfa3edaaf0824e5e59ed974a7c53030ccd..00e9135bc1791f7a59e9270f48e9c9282f7b5b5d 100644
--- a/crates/channel/Cargo.toml
+++ b/crates/channel/Cargo.toml
@@ -47,5 +47,6 @@ tempfile = "3"
collections = { path = "../collections", features = ["test-support"] }
gpui = { path = "../gpui", features = ["test-support"] }
rpc = { path = "../rpc", features = ["test-support"] }
+client = { path = "../client", features = ["test-support"] }
settings = { path = "../settings", features = ["test-support"] }
util = { path = "../util", features = ["test-support"] }
diff --git a/crates/channel/src/channel.rs b/crates/channel/src/channel.rs
index 15631b7dd312f36126ec1e13b2413fc01e5ca8af..37f1c0ce44ba8a8f3a86247ea411ba0d2b669f7d 100644
--- a/crates/channel/src/channel.rs
+++ b/crates/channel/src/channel.rs
@@ -1,14 +1,18 @@
+mod channel_buffer;
+mod channel_chat;
mod channel_store;
-pub mod channel_buffer;
-use std::sync::Arc;
+pub use channel_buffer::{ChannelBuffer, ChannelBufferEvent};
+pub use channel_chat::{ChannelChat, ChannelChatEvent, ChannelMessage, ChannelMessageId};
+pub use channel_store::{Channel, ChannelEvent, ChannelId, ChannelMembership, ChannelStore};
-pub use channel_store::*;
use client::Client;
+use std::sync::Arc;
#[cfg(test)]
mod channel_store_tests;
pub fn init(client: &Arc) {
channel_buffer::init(client);
+ channel_chat::init(client);
}
diff --git a/crates/channel/src/channel_buffer.rs b/crates/channel/src/channel_buffer.rs
index e11282cf7963a9ba4f34c7530a6e8267fbe35274..06f9093fb5f9c628ba3ec2320f3605480f65486c 100644
--- a/crates/channel/src/channel_buffer.rs
+++ b/crates/channel/src/channel_buffer.rs
@@ -23,13 +23,13 @@ pub struct ChannelBuffer {
subscription: Option,
}
-pub enum Event {
+pub enum ChannelBufferEvent {
CollaboratorsChanged,
Disconnected,
}
impl Entity for ChannelBuffer {
- type Event = Event;
+ type Event = ChannelBufferEvent;
fn release(&mut self, _: &mut AppContext) {
if self.connected {
@@ -101,7 +101,7 @@ impl ChannelBuffer {
}
}
self.collaborators = collaborators;
- cx.emit(Event::CollaboratorsChanged);
+ cx.emit(ChannelBufferEvent::CollaboratorsChanged);
cx.notify();
}
@@ -141,7 +141,7 @@ impl ChannelBuffer {
this.update(&mut cx, |this, cx| {
this.collaborators.push(collaborator);
- cx.emit(Event::CollaboratorsChanged);
+ cx.emit(ChannelBufferEvent::CollaboratorsChanged);
cx.notify();
});
@@ -165,7 +165,7 @@ impl ChannelBuffer {
true
}
});
- cx.emit(Event::CollaboratorsChanged);
+ cx.emit(ChannelBufferEvent::CollaboratorsChanged);
cx.notify();
});
@@ -185,7 +185,7 @@ impl ChannelBuffer {
break;
}
}
- cx.emit(Event::CollaboratorsChanged);
+ cx.emit(ChannelBufferEvent::CollaboratorsChanged);
cx.notify();
});
@@ -230,7 +230,7 @@ impl ChannelBuffer {
if self.connected {
self.connected = false;
self.subscription.take();
- cx.emit(Event::Disconnected);
+ cx.emit(ChannelBufferEvent::Disconnected);
cx.notify()
}
}
diff --git a/crates/channel/src/channel_chat.rs b/crates/channel/src/channel_chat.rs
new file mode 100644
index 0000000000000000000000000000000000000000..8e03a3b6fd0f1d2d56ccb648525f268766a9eaf1
--- /dev/null
+++ b/crates/channel/src/channel_chat.rs
@@ -0,0 +1,505 @@
+use crate::Channel;
+use anyhow::{anyhow, Result};
+use client::{
+ proto,
+ user::{User, UserStore},
+ Client, Subscription, TypedEnvelope,
+};
+use futures::lock::Mutex;
+use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
+use rand::prelude::*;
+use std::{collections::HashSet, mem, ops::Range, sync::Arc};
+use sum_tree::{Bias, SumTree};
+use time::OffsetDateTime;
+use util::{post_inc, ResultExt as _, TryFutureExt};
+
+pub struct ChannelChat {
+ channel: Arc,
+ messages: SumTree,
+ loaded_all_messages: bool,
+ next_pending_message_id: usize,
+ user_store: ModelHandle,
+ rpc: Arc,
+ outgoing_messages_lock: Arc>,
+ rng: StdRng,
+ _subscription: Subscription,
+}
+
+#[derive(Clone, Debug)]
+pub struct ChannelMessage {
+ pub id: ChannelMessageId,
+ pub body: String,
+ pub timestamp: OffsetDateTime,
+ pub sender: Arc,
+ pub nonce: u128,
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+pub enum ChannelMessageId {
+ Saved(u64),
+ Pending(usize),
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct ChannelMessageSummary {
+ max_id: ChannelMessageId,
+ count: usize,
+}
+
+#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct Count(usize);
+
+#[derive(Clone, Debug, PartialEq)]
+pub enum ChannelChatEvent {
+ MessagesUpdated {
+ old_range: Range,
+ new_count: usize,
+ },
+}
+
+pub fn init(client: &Arc) {
+ client.add_model_message_handler(ChannelChat::handle_message_sent);
+ client.add_model_message_handler(ChannelChat::handle_message_removed);
+}
+
+impl Entity for ChannelChat {
+ type Event = ChannelChatEvent;
+
+ fn release(&mut self, _: &mut AppContext) {
+ self.rpc
+ .send(proto::LeaveChannelChat {
+ channel_id: self.channel.id,
+ })
+ .log_err();
+ }
+}
+
+impl ChannelChat {
+ pub async fn new(
+ channel: Arc,
+ user_store: ModelHandle,
+ client: Arc,
+ mut cx: AsyncAppContext,
+ ) -> Result> {
+ let channel_id = channel.id;
+ let subscription = client.subscribe_to_entity(channel_id).unwrap();
+
+ let response = client
+ .request(proto::JoinChannelChat { channel_id })
+ .await?;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ let loaded_all_messages = response.done;
+
+ Ok(cx.add_model(|cx| {
+ let mut this = Self {
+ channel,
+ user_store,
+ rpc: client,
+ outgoing_messages_lock: Default::default(),
+ messages: Default::default(),
+ loaded_all_messages,
+ next_pending_message_id: 0,
+ rng: StdRng::from_entropy(),
+ _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
+ };
+ this.insert_messages(messages, cx);
+ this
+ }))
+ }
+
+ pub fn channel(&self) -> &Arc {
+ &self.channel
+ }
+
+ pub fn send_message(
+ &mut self,
+ body: String,
+ cx: &mut ModelContext,
+ ) -> Result>> {
+ if body.is_empty() {
+ Err(anyhow!("message body can't be empty"))?;
+ }
+
+ let current_user = self
+ .user_store
+ .read(cx)
+ .current_user()
+ .ok_or_else(|| anyhow!("current_user is not present"))?;
+
+ let channel_id = self.channel.id;
+ let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
+ let nonce = self.rng.gen();
+ self.insert_messages(
+ SumTree::from_item(
+ ChannelMessage {
+ id: pending_id,
+ body: body.clone(),
+ sender: current_user,
+ timestamp: OffsetDateTime::now_utc(),
+ nonce,
+ },
+ &(),
+ ),
+ cx,
+ );
+ let user_store = self.user_store.clone();
+ let rpc = self.rpc.clone();
+ let outgoing_messages_lock = self.outgoing_messages_lock.clone();
+ Ok(cx.spawn(|this, mut cx| async move {
+ let outgoing_message_guard = outgoing_messages_lock.lock().await;
+ let request = rpc.request(proto::SendChannelMessage {
+ channel_id,
+ body,
+ nonce: Some(nonce.into()),
+ });
+ let response = request.await?;
+ drop(outgoing_message_guard);
+ let message = ChannelMessage::from_proto(
+ response.message.ok_or_else(|| anyhow!("invalid message"))?,
+ &user_store,
+ &mut cx,
+ )
+ .await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx);
+ Ok(())
+ })
+ }))
+ }
+
+ pub fn remove_message(&mut self, id: u64, cx: &mut ModelContext) -> Task> {
+ let response = self.rpc.request(proto::RemoveChannelMessage {
+ channel_id: self.channel.id,
+ message_id: id,
+ });
+ cx.spawn(|this, mut cx| async move {
+ response.await?;
+
+ this.update(&mut cx, |this, cx| {
+ this.message_removed(id, cx);
+ Ok(())
+ })
+ })
+ }
+
+ pub fn load_more_messages(&mut self, cx: &mut ModelContext) -> bool {
+ if !self.loaded_all_messages {
+ let rpc = self.rpc.clone();
+ let user_store = self.user_store.clone();
+ let channel_id = self.channel.id;
+ if let Some(before_message_id) =
+ self.messages.first().and_then(|message| match message.id {
+ ChannelMessageId::Saved(id) => Some(id),
+ ChannelMessageId::Pending(_) => None,
+ })
+ {
+ cx.spawn(|this, mut cx| {
+ async move {
+ let response = rpc
+ .request(proto::GetChannelMessages {
+ channel_id,
+ before_message_id,
+ })
+ .await?;
+ let loaded_all_messages = response.done;
+ let messages =
+ messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ this.loaded_all_messages = loaded_all_messages;
+ this.insert_messages(messages, cx);
+ });
+ anyhow::Ok(())
+ }
+ .log_err()
+ })
+ .detach();
+ return true;
+ }
+ }
+ false
+ }
+
+ pub fn rejoin(&mut self, cx: &mut ModelContext) {
+ let user_store = self.user_store.clone();
+ let rpc = self.rpc.clone();
+ let channel_id = self.channel.id;
+ cx.spawn(|this, mut cx| {
+ async move {
+ let response = rpc.request(proto::JoinChannelChat { channel_id }).await?;
+ let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
+ let loaded_all_messages = response.done;
+
+ let pending_messages = this.update(&mut cx, |this, cx| {
+ if let Some((first_new_message, last_old_message)) =
+ messages.first().zip(this.messages.last())
+ {
+ if first_new_message.id > last_old_message.id {
+ let old_messages = mem::take(&mut this.messages);
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: 0..old_messages.summary().count,
+ new_count: 0,
+ });
+ this.loaded_all_messages = loaded_all_messages;
+ }
+ }
+
+ this.insert_messages(messages, cx);
+ if loaded_all_messages {
+ this.loaded_all_messages = loaded_all_messages;
+ }
+
+ this.pending_messages().cloned().collect::>()
+ });
+
+ for pending_message in pending_messages {
+ let request = rpc.request(proto::SendChannelMessage {
+ channel_id,
+ body: pending_message.body,
+ nonce: Some(pending_message.nonce.into()),
+ });
+ let response = request.await?;
+ let message = ChannelMessage::from_proto(
+ response.message.ok_or_else(|| anyhow!("invalid message"))?,
+ &user_store,
+ &mut cx,
+ )
+ .await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx);
+ });
+ }
+
+ anyhow::Ok(())
+ }
+ .log_err()
+ })
+ .detach();
+ }
+
+ pub fn message_count(&self) -> usize {
+ self.messages.summary().count
+ }
+
+ pub fn messages(&self) -> &SumTree {
+ &self.messages
+ }
+
+ pub fn message(&self, ix: usize) -> &ChannelMessage {
+ let mut cursor = self.messages.cursor::();
+ cursor.seek(&Count(ix), Bias::Right, &());
+ cursor.item().unwrap()
+ }
+
+ pub fn messages_in_range(&self, range: Range) -> impl Iterator- {
+ let mut cursor = self.messages.cursor::();
+ cursor.seek(&Count(range.start), Bias::Right, &());
+ cursor.take(range.len())
+ }
+
+ pub fn pending_messages(&self) -> impl Iterator
- {
+ let mut cursor = self.messages.cursor::();
+ cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
+ cursor
+ }
+
+ async fn handle_message_sent(
+ this: ModelHandle,
+ message: TypedEnvelope,
+ _: Arc,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
+ let message = message
+ .payload
+ .message
+ .ok_or_else(|| anyhow!("empty message"))?;
+
+ let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
+ this.update(&mut cx, |this, cx| {
+ this.insert_messages(SumTree::from_item(message, &()), cx)
+ });
+
+ Ok(())
+ }
+
+ async fn handle_message_removed(
+ this: ModelHandle,
+ message: TypedEnvelope,
+ _: Arc,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ this.update(&mut cx, |this, cx| {
+ this.message_removed(message.payload.message_id, cx)
+ });
+ Ok(())
+ }
+
+ fn insert_messages(&mut self, messages: SumTree, cx: &mut ModelContext) {
+ if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
+ let nonces = messages
+ .cursor::<()>()
+ .map(|m| m.nonce)
+ .collect::>();
+
+ let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
+ let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
+ let start_ix = old_cursor.start().1 .0;
+ let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
+ let removed_count = removed_messages.summary().count;
+ let new_count = messages.summary().count;
+ let end_ix = start_ix + removed_count;
+
+ new_messages.append(messages, &());
+
+ let mut ranges = Vec::>::new();
+ if new_messages.last().unwrap().is_pending() {
+ new_messages.append(old_cursor.suffix(&()), &());
+ } else {
+ new_messages.append(
+ old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
+ &(),
+ );
+
+ while let Some(message) = old_cursor.item() {
+ let message_ix = old_cursor.start().1 .0;
+ if nonces.contains(&message.nonce) {
+ if ranges.last().map_or(false, |r| r.end == message_ix) {
+ ranges.last_mut().unwrap().end += 1;
+ } else {
+ ranges.push(message_ix..message_ix + 1);
+ }
+ } else {
+ new_messages.push(message.clone(), &());
+ }
+ old_cursor.next(&());
+ }
+ }
+
+ drop(old_cursor);
+ self.messages = new_messages;
+
+ for range in ranges.into_iter().rev() {
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: range,
+ new_count: 0,
+ });
+ }
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: start_ix..end_ix,
+ new_count,
+ });
+ cx.notify();
+ }
+ }
+
+ fn message_removed(&mut self, id: u64, cx: &mut ModelContext) {
+ let mut cursor = self.messages.cursor::();
+ let mut messages = cursor.slice(&ChannelMessageId::Saved(id), Bias::Left, &());
+ if let Some(item) = cursor.item() {
+ if item.id == ChannelMessageId::Saved(id) {
+ let ix = messages.summary().count;
+ cursor.next(&());
+ messages.append(cursor.suffix(&()), &());
+ drop(cursor);
+ self.messages = messages;
+ cx.emit(ChannelChatEvent::MessagesUpdated {
+ old_range: ix..ix + 1,
+ new_count: 0,
+ });
+ }
+ }
+ }
+}
+
+async fn messages_from_proto(
+ proto_messages: Vec,
+ user_store: &ModelHandle,
+ cx: &mut AsyncAppContext,
+) -> Result> {
+ let unique_user_ids = proto_messages
+ .iter()
+ .map(|m| m.sender_id)
+ .collect::>()
+ .into_iter()
+ .collect();
+ user_store
+ .update(cx, |user_store, cx| {
+ user_store.get_users(unique_user_ids, cx)
+ })
+ .await?;
+
+ let mut messages = Vec::with_capacity(proto_messages.len());
+ for message in proto_messages {
+ messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
+ }
+ let mut result = SumTree::new();
+ result.extend(messages, &());
+ Ok(result)
+}
+
+impl ChannelMessage {
+ pub async fn from_proto(
+ message: proto::ChannelMessage,
+ user_store: &ModelHandle,
+ cx: &mut AsyncAppContext,
+ ) -> Result {
+ let sender = user_store
+ .update(cx, |user_store, cx| {
+ user_store.get_user(message.sender_id, cx)
+ })
+ .await?;
+ Ok(ChannelMessage {
+ id: ChannelMessageId::Saved(message.id),
+ body: message.body,
+ timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
+ sender,
+ nonce: message
+ .nonce
+ .ok_or_else(|| anyhow!("nonce is required"))?
+ .into(),
+ })
+ }
+
+ pub fn is_pending(&self) -> bool {
+ matches!(self.id, ChannelMessageId::Pending(_))
+ }
+}
+
+impl sum_tree::Item for ChannelMessage {
+ type Summary = ChannelMessageSummary;
+
+ fn summary(&self) -> Self::Summary {
+ ChannelMessageSummary {
+ max_id: self.id,
+ count: 1,
+ }
+ }
+}
+
+impl Default for ChannelMessageId {
+ fn default() -> Self {
+ Self::Saved(0)
+ }
+}
+
+impl sum_tree::Summary for ChannelMessageSummary {
+ type Context = ();
+
+ fn add_summary(&mut self, summary: &Self, _: &()) {
+ self.max_id = summary.max_id;
+ self.count += summary.count;
+ }
+}
+
+impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
+ fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
+ debug_assert!(summary.max_id > *self);
+ *self = summary.max_id;
+ }
+}
+
+impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
+ fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
+ self.0 += summary.count;
+ }
+}
diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs
index a4c8da6df4f594553ac3629a4d4fc4a1176d89a3..e61e520b471029e692ecdc29bac58777917d92ba 100644
--- a/crates/channel/src/channel_store.rs
+++ b/crates/channel/src/channel_store.rs
@@ -1,4 +1,4 @@
-use crate::channel_buffer::ChannelBuffer;
+use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
use anyhow::{anyhow, Result};
use client::{Client, Subscription, User, UserId, UserStore};
use collections::{hash_map, HashMap, HashSet};
@@ -20,7 +20,8 @@ pub struct ChannelStore {
channels_with_admin_privileges: HashSet,
outgoing_invites: HashSet<(ChannelId, UserId)>,
update_channels_tx: mpsc::UnboundedSender,
- opened_buffers: HashMap,
+ opened_buffers: HashMap>,
+ opened_chats: HashMap>,
client: Arc,
user_store: ModelHandle,
_rpc_subscription: Subscription,
@@ -50,15 +51,9 @@ impl Entity for ChannelStore {
type Event = ChannelEvent;
}
-pub enum ChannelMemberStatus {
- Invited,
- Member,
- NotMember,
-}
-
-enum OpenedChannelBuffer {
- Open(WeakModelHandle),
- Loading(Shared, Arc>>>),
+enum OpenedModelHandle {
+ Open(WeakModelHandle),
+ Loading(Shared, Arc>>>),
}
impl ChannelStore {
@@ -94,6 +89,7 @@ impl ChannelStore {
channels_with_admin_privileges: Default::default(),
outgoing_invites: Default::default(),
opened_buffers: Default::default(),
+ opened_chats: Default::default(),
update_channels_tx,
client,
user_store,
@@ -115,6 +111,10 @@ impl ChannelStore {
}
}
+ pub fn client(&self) -> Arc {
+ self.client.clone()
+ }
+
pub fn has_children(&self, channel_id: ChannelId) -> bool {
self.channel_paths.iter().any(|path| {
if let Some(ix) = path.iter().position(|id| *id == channel_id) {
@@ -129,6 +129,12 @@ impl ChannelStore {
self.channel_paths.len()
}
+ pub fn index_of_channel(&self, channel_id: ChannelId) -> Option {
+ self.channel_paths
+ .iter()
+ .position(|path| path.ends_with(&[channel_id]))
+ }
+
pub fn channels(&self) -> impl '_ + Iterator
- )> {
self.channel_paths.iter().map(move |path| {
let id = path.last().unwrap();
@@ -154,7 +160,7 @@ impl ChannelStore {
pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
if let Some(buffer) = self.opened_buffers.get(&channel_id) {
- if let OpenedChannelBuffer::Open(buffer) = buffer {
+ if let OpenedModelHandle::Open(buffer) = buffer {
return buffer.upgrade(cx).is_some();
}
}
@@ -166,24 +172,62 @@ impl ChannelStore {
channel_id: ChannelId,
cx: &mut ModelContext,
) -> Task>> {
- // Make sure that a given channel buffer is only opened once per
- // app instance, even if this method is called multiple times
- // with the same channel id while the first task is still running.
+ let client = self.client.clone();
+ self.open_channel_resource(
+ channel_id,
+ |this| &mut this.opened_buffers,
+ |channel, cx| ChannelBuffer::new(channel, client, cx),
+ cx,
+ )
+ }
+
+ pub fn open_channel_chat(
+ &mut self,
+ channel_id: ChannelId,
+ cx: &mut ModelContext,
+ ) -> Task>> {
+ let client = self.client.clone();
+ let user_store = self.user_store.clone();
+ self.open_channel_resource(
+ channel_id,
+ |this| &mut this.opened_chats,
+ |channel, cx| ChannelChat::new(channel, user_store, client, cx),
+ cx,
+ )
+ }
+
+ /// Asynchronously open a given resource associated with a channel.
+ ///
+ /// Make sure that the resource is only opened once, even if this method
+ /// is called multiple times with the same channel id while the first task
+ /// is still running.
+ fn open_channel_resource(
+ &mut self,
+ channel_id: ChannelId,
+ get_map: fn(&mut Self) -> &mut HashMap>,
+ load: F,
+ cx: &mut ModelContext,
+ ) -> Task>>
+ where
+ F: 'static + FnOnce(Arc, AsyncAppContext) -> Fut,
+ Fut: Future