@@ -4,11 +4,13 @@ use anyhow::anyhow;
use anyhow::Result;
use collections::HashMap;
use collections::HashSet;
+use futures::channel::mpsc;
use futures::Future;
use futures::StreamExt;
use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
use rpc::{proto, TypedEnvelope};
use std::sync::Arc;
+use util::ResultExt;
pub type ChannelId = u64;
pub type UserId = u64;
@@ -20,10 +22,12 @@ pub struct ChannelStore {
channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
channels_with_admin_privileges: HashSet<ChannelId>,
outgoing_invites: HashSet<(ChannelId, UserId)>,
+ update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
_rpc_subscription: Subscription,
_watch_connection_status: Task<()>,
+ _update_channels: Task<()>,
}
#[derive(Clone, Debug, PartialEq)]
@@ -62,6 +66,7 @@ impl ChannelStore {
let rpc_subscription =
client.add_message_handler(cx.handle(), Self::handle_update_channels);
+ let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
let mut connection_status = client.status();
let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
while let Some(status) = connection_status.next().await {
@@ -89,10 +94,23 @@ impl ChannelStore {
channel_participants: Default::default(),
channels_with_admin_privileges: Default::default(),
outgoing_invites: Default::default(),
+ update_channels_tx,
client,
user_store,
_rpc_subscription: rpc_subscription,
_watch_connection_status: watch_connection_status,
+ _update_channels: cx.spawn_weak(|this, mut cx| async move {
+ while let Some(update_channels) = update_channels_rx.next().await {
+ if let Some(this) = this.upgrade(&cx) {
+ let update_task = this.update(&mut cx, |this, cx| {
+ this.update_channels(update_channels, cx)
+ });
+ if let Some(update_task) = update_task {
+ update_task.await.log_err();
+ }
+ }
+ }
+ }),
}
}
@@ -159,13 +177,14 @@ impl ChannelStore {
let channel_id = channel.id;
this.update(&mut cx, |this, cx| {
- this.update_channels(
+ let task = this.update_channels(
proto::UpdateChannels {
channels: vec![channel],
..Default::default()
},
cx,
);
+ assert!(task.is_none());
// This event is emitted because the collab panel wants to clear the pending edit state
// before this frame is rendered. But we can't guarantee that the collab panel's future
@@ -287,13 +306,14 @@ impl ChannelStore {
.channel
.ok_or_else(|| anyhow!("missing channel in response"))?;
this.update(&mut cx, |this, cx| {
- this.update_channels(
+ let task = this.update_channels(
proto::UpdateChannels {
channels: vec![channel],
..Default::default()
},
cx,
);
+ assert!(task.is_none());
// This event is emitted because the collab panel wants to clear the pending edit state
// before this frame is rendered. But we can't guarantee that the collab panel's future
@@ -375,8 +395,10 @@ impl ChannelStore {
_: Arc<Client>,
mut cx: AsyncAppContext,
) -> Result<()> {
- this.update(&mut cx, |this, cx| {
- this.update_channels(message.payload, cx);
+ this.update(&mut cx, |this, _| {
+ this.update_channels_tx
+ .unbounded_send(message.payload)
+ .unwrap();
});
Ok(())
}
@@ -385,7 +407,7 @@ impl ChannelStore {
&mut self,
payload: proto::UpdateChannels,
cx: &mut ModelContext<ChannelStore>,
- ) {
+ ) -> Option<Task<Result<()>>> {
if !payload.remove_channel_invitations.is_empty() {
self.channel_invitations
.retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
@@ -470,6 +492,11 @@ impl ChannelStore {
}
}
+ cx.notify();
+ if payload.channel_participants.is_empty() {
+ return None;
+ }
+
let mut all_user_ids = Vec::new();
let channel_participants = payload.channel_participants;
for entry in &channel_participants {
@@ -480,11 +507,10 @@ impl ChannelStore {
}
}
- // TODO: Race condition if an update channels messages comes in while resolving avatars
let users = self
.user_store
.update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
- cx.spawn(|this, mut cx| async move {
+ Some(cx.spawn(|this, mut cx| async move {
let users = users.await?;
this.update(&mut cx, |this, cx| {
@@ -509,10 +535,7 @@ impl ChannelStore {
cx.notify();
});
anyhow::Ok(())
- })
- .detach();
-
- cx.notify();
+ }))
}
fn channel_path_sorting_key<'a>(
@@ -1156,7 +1156,6 @@ async fn rejoin_room(
channel_members = mem::take(&mut rejoined_room.channel_members);
}
- //TODO: move this into the room guard
if let Some(channel_id) = channel_id {
channel_updated(
channel_id,
@@ -2453,9 +2452,6 @@ async fn join_channel(
joined_room.clone()
};
- // TODO - do this while still holding the room guard,
- // currently there's a possible race condition if someone joins the channel
- // after we've dropped the lock but before we finish sending these updates
channel_updated(
channel_id,
&joined_room.room,
@@ -2748,7 +2744,6 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
return Ok(());
}
- // TODO - do this while holding the room guard.
if let Some(channel_id) = channel_id {
channel_updated(
channel_id,
@@ -290,6 +290,7 @@ async fn test_core_channels(
);
}
+#[track_caller]
fn assert_participants_eq(participants: &[Arc<User>], expected_partitipants: &[u64]) {
assert_eq!(
participants.iter().map(|p| p.id).collect::<Vec<_>>(),
@@ -297,6 +298,7 @@ fn assert_participants_eq(participants: &[Arc<User>], expected_partitipants: &[u
);
}
+#[track_caller]
fn assert_members_eq(
members: &[ChannelMembership],
expected_members: &[(u64, bool, proto::channel_member::Kind)],