Detailed changes
@@ -1,3 +1,4 @@
+use crate::Status;
use crate::{Client, Subscription, User, UserStore};
use anyhow::anyhow;
use anyhow::Result;
@@ -21,7 +22,7 @@ pub struct ChannelStore {
client: Arc<Client>,
user_store: ModelHandle<UserStore>,
_rpc_subscription: Subscription,
- _maintain_user: Task<()>,
+ _watch_connection_status: Task<()>,
}
#[derive(Clone, Debug, PartialEq)]
@@ -57,15 +58,16 @@ impl ChannelStore {
let rpc_subscription =
client.add_message_handler(cx.handle(), Self::handle_update_channels);
- let mut current_user = user_store.read(cx).watch_current_user();
- let maintain_user = cx.spawn_weak(|this, mut cx| async move {
- while let Some(current_user) = current_user.next().await {
- if current_user.is_none() {
+ let mut connection_status = client.status();
+ let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
+ while let Some(status) = connection_status.next().await {
+ if matches!(status, Status::ConnectionLost | Status::SignedOut) {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
this.channels.clear();
this.channel_invitations.clear();
this.channel_participants.clear();
+ this.channels_with_admin_privileges.clear();
this.outgoing_invites.clear();
cx.notify();
});
@@ -84,7 +86,7 @@ impl ChannelStore {
client,
user_store,
_rpc_subscription: rpc_subscription,
- _maintain_user: maintain_user,
+ _watch_connection_status: watch_connection_status,
}
}
@@ -2,7 +2,7 @@ mod connection_pool;
use crate::{
auth,
- db::{self, ChannelId, Database, ProjectId, RoomId, ServerId, User, UserId},
+ db::{self, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User, UserId},
executor::Executor,
AppState, Result,
};
@@ -541,8 +541,7 @@ impl Server {
pool.add_connection(connection_id, user_id, user.admin);
this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
this.peer.send(connection_id, build_initial_channels_update(
- channels_for_user.channels,
- channels_for_user.channel_participants,
+ channels_for_user,
channel_invites
))?;
@@ -2537,13 +2536,12 @@ fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
}
fn build_initial_channels_update(
- channels: Vec<db::Channel>,
- channel_participants: HashMap<db::ChannelId, Vec<UserId>>,
+ channels: ChannelsForUser,
channel_invites: Vec<db::Channel>,
) -> proto::UpdateChannels {
let mut update = proto::UpdateChannels::default();
- for channel in channels {
+ for channel in channels.channels {
update.channels.push(proto::Channel {
id: channel.id.to_proto(),
name: channel.name,
@@ -2551,7 +2549,7 @@ fn build_initial_channels_update(
});
}
- for (channel_id, participants) in channel_participants {
+ for (channel_id, participants) in channels.channel_participants {
update
.channel_participants
.push(proto::ChannelParticipants {
@@ -2560,6 +2558,18 @@ fn build_initial_channels_update(
});
}
+ update
+ .channel_permissions
+ .extend(
+ channels
+ .channels_with_admin_privileges
+ .into_iter()
+ .map(|id| proto::ChannelPermission {
+ channel_id: id.to_proto(),
+ is_admin: true,
+ }),
+ );
+
for channel in channel_invites {
update.channel_invitations.push(proto::Channel {
id: channel.id.to_proto(),
@@ -1,8 +1,11 @@
-use crate::tests::{room_participants, RoomParticipants, TestServer};
+use crate::{
+ rpc::RECONNECT_TIMEOUT,
+ tests::{room_participants, RoomParticipants, TestServer},
+};
use call::ActiveCall;
use client::{Channel, ChannelMembership, User};
use gpui::{executor::Deterministic, TestAppContext};
-use rpc::proto;
+use rpc::{proto, RECEIVE_TIMEOUT};
use std::sync::Arc;
#[gpui::test]
@@ -49,7 +52,9 @@ async fn test_core_channels(
depth: 1,
})
]
- )
+ );
+ assert!(channels.is_user_admin(channel_a_id));
+ assert!(channels.is_user_admin(channel_b_id));
});
client_b
@@ -84,6 +89,7 @@ async fn test_core_channels(
})]
)
});
+
let members = client_a
.channel_store()
.update(cx_a, |store, cx| {
@@ -128,7 +134,6 @@ async fn test_core_channels(
id: channel_a_id,
name: "channel-a".to_string(),
parent_id: None,
-
depth: 0,
}),
Arc::new(Channel {
@@ -138,7 +143,9 @@ async fn test_core_channels(
depth: 1,
})
]
- )
+ );
+ assert!(!channels.is_user_admin(channel_a_id));
+ assert!(!channels.is_user_admin(channel_b_id));
});
let channel_c_id = client_a
@@ -280,6 +287,30 @@ async fn test_core_channels(
client_b
.channel_store()
.read_with(cx_b, |channels, _| assert_eq!(channels.channels(), &[]));
+
+ // When disconnected, client A sees no channels.
+ server.forbid_connections();
+ server.disconnect_client(client_a.peer_id().unwrap());
+ deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+ client_a.channel_store().read_with(cx_a, |channels, _| {
+ assert_eq!(channels.channels(), &[]);
+ assert!(!channels.is_user_admin(channel_a_id));
+ });
+
+ server.allow_connections();
+ deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+ client_a.channel_store().read_with(cx_a, |channels, _| {
+ assert_eq!(
+ channels.channels(),
+ &[Arc::new(Channel {
+ id: channel_a_id,
+ name: "channel-a".to_string(),
+ parent_id: None,
+ depth: 0,
+ })]
+ );
+ assert!(channels.is_user_admin(channel_a_id));
+ });
}
fn assert_participants_eq(participants: &[Arc<User>], expected_partitipants: &[u64]) {