Detailed changes
@@ -33,6 +33,8 @@ pub struct ChannelStore {
pub channel_index: ChannelIndex,
channel_invitations: Vec<Arc<Channel>>,
channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
+ observed_chat_messages: HashMap<ChannelId, u64>,
+ observed_notes_versions: HashMap<ChannelId, proto::NotesVersion>,
outgoing_invites: HashSet<(ChannelId, UserId)>,
update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
@@ -51,8 +53,8 @@ pub struct Channel {
pub name: SharedString,
pub visibility: proto::ChannelVisibility,
pub role: proto::ChannelRole,
- pub unseen_note_version: Option<(u64, clock::Global)>,
- pub unseen_message_id: Option<u64>,
+ pub latest_message_id: Option<u64>,
+ pub latest_note_version: Option<proto::NotesVersion>,
pub parent_path: Vec<u64>,
}
@@ -137,8 +139,10 @@ impl ChannelStore {
user_store: Model<UserStore>,
cx: &mut ModelContext<Self>,
) -> Self {
- let rpc_subscription =
- client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
+ let rpc_subscriptions = [
+ client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
+ client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
+ ];
let mut connection_status = client.status();
let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
@@ -195,6 +199,8 @@ impl ChannelStore {
.await
.log_err();
}),
+ observed_chat_messages: Default::default(),
+ observed_notes_versions: Default::default(),
}
}
@@ -747,6 +753,19 @@ impl ChannelStore {
Ok(())
}
+ async fn handle_update_user_channels(
+ this: Model<Self>,
+ message: TypedEnvelope<proto::ObservedChannelMessage>,
+ _: Arc<Client>,
+ mut cx: AsyncAppContext,
+ ) -> Result<()> {
+ this.update(&mut cx, |this, _| {
+ // this.seen_channel_message_ids
+ // .insert(message.channel_id, message.message_id);
+ cx.notify();
+ })?;
+ }
+
fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
self.channel_index.clear();
self.channel_invitations.clear();
@@ -911,8 +930,6 @@ impl ChannelStore {
visibility: channel.visibility(),
role: channel.role(),
name: channel.name.into(),
- unseen_note_version: None,
- unseen_message_id: None,
parent_path: channel.parent_path,
}),
),
@@ -113,9 +113,9 @@ impl<'a> ChannelPathsInsertGuard<'a> {
visibility: channel_proto.visibility(),
role: channel_proto.role(),
name: channel_proto.name.into(),
- unseen_note_version: None,
- unseen_message_id: None,
parent_path: channel_proto.parent_path,
+ latest_message_id: channel_proto.latest_message_id,
+ latest_note_version: channel_proto.latest_note_version,
}),
);
self.insert_root(channel_proto.id);
@@ -618,8 +618,8 @@ impl ChannelMember {
pub struct ChannelsForUser {
pub channels: Vec<Channel>,
pub channel_participants: HashMap<ChannelId, Vec<UserId>>,
- pub unseen_buffer_changes: Vec<proto::UnseenChannelBufferChange>,
- pub channel_messages: Vec<proto::UnseenChannelMessage>,
+ pub latest_buffer_versions: Vec<proto::ChannelBufferVersion>,
+ pub latest_channel_messages: Vec<proto::ChannelMessageId>,
}
#[derive(Debug)]
@@ -691,15 +691,13 @@ impl Database {
.unseen_channel_buffer_changes(user_id, &channel_ids, &*tx)
.await?;
- let unseen_messages = self
- .unseen_channel_messages(user_id, &channel_ids, &*tx)
- .await?;
+ let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?;
Ok(ChannelsForUser {
channels,
channel_participants,
- unseen_buffer_changes: channel_buffer_changes,
- channel_messages: unseen_messages,
+ latest_buffer_versions: channel_buffer_changes,
+ latest_channel_messages: latest_messages,
})
}
@@ -385,25 +385,11 @@ impl Database {
Ok(())
}
- /// Returns the unseen messages for the given user in the specified channels.
- pub async fn unseen_channel_messages(
+ pub async fn latest_channel_messages(
&self,
- user_id: UserId,
channel_ids: &[ChannelId],
tx: &DatabaseTransaction,
- ) -> Result<Vec<proto::UnseenChannelMessage>> {
- let mut observed_messages_by_channel_id = HashMap::default();
- let mut rows = observed_channel_messages::Entity::find()
- .filter(observed_channel_messages::Column::UserId.eq(user_id))
- .filter(observed_channel_messages::Column::ChannelId.is_in(channel_ids.iter().copied()))
- .stream(&*tx)
- .await?;
-
- while let Some(row) = rows.next().await {
- let row = row?;
- observed_messages_by_channel_id.insert(row.channel_id, row);
- }
- drop(rows);
+ ) -> Result<Vec<proto::ChannelMessageId>> {
let mut values = String::new();
for id in channel_ids {
if !values.is_empty() {
@@ -412,10 +398,6 @@ impl Database {
write!(&mut values, "({})", id).unwrap();
}
- if values.is_empty() {
- return Ok(Default::default());
- }
-
let sql = format!(
r#"
SELECT
@@ -437,26 +419,20 @@ impl Database {
);
let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
- let last_messages = channel_message::Model::find_by_statement(stmt)
- .all(&*tx)
+ let mut last_messages = channel_message::Model::find_by_statement(stmt)
+ .stream(&*tx)
.await?;
- let mut changes = Vec::new();
- for last_message in last_messages {
- if let Some(observed_message) =
- observed_messages_by_channel_id.get(&last_message.channel_id)
- {
- if observed_message.channel_message_id == last_message.id {
- continue;
- }
- }
- changes.push(proto::UnseenChannelMessage {
- channel_id: last_message.channel_id.to_proto(),
- message_id: last_message.id.to_proto(),
+ let mut results = Vec::new();
+ while let Some(result) = last_messages.next().await {
+ let message = result?;
+ results.push(proto::ChannelMessageId {
+ channel_id: message.channel_id.to_proto(),
+ message_id: message.id.to_proto(),
});
}
- Ok(changes)
+ Ok(results)
}
/// Removes the channel message with the given ID.
@@ -2842,25 +2842,27 @@ async fn update_channel_buffer(
let pool = &*session.connection_pool().await;
- broadcast(
- None,
- non_collaborators
- .iter()
- .flat_map(|user_id| pool.user_connection_ids(*user_id)),
- |peer_id| {
- session.peer.send(
- peer_id.into(),
- proto::UpdateChannels {
- unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange {
- channel_id: channel_id.to_proto(),
- epoch: epoch as u64,
- version: version.clone(),
- }],
- ..Default::default()
- },
- )
- },
- );
+ todo!()
+
+ // broadcast(
+ // None,
+ // non_collaborators
+ // .iter()
+ // .flat_map(|user_id| pool.user_connection_ids(*user_id)),
+ // |peer_id| {
+ // session.peer.send(
+ // peer_id.into(),
+ // proto::UpdateChannels {
+ // unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange {
+ // channel_id: channel_id.to_proto(),
+ // epoch: epoch as u64,
+ // version: version.clone(),
+ // }],
+ // ..Default::default()
+ // },
+ // )
+ // },
+ // );
Ok(())
}
@@ -3315,8 +3317,8 @@ fn build_channels_update(
update.channels.push(channel.to_proto());
}
- update.unseen_channel_buffer_changes = channels.unseen_buffer_changes;
- update.unseen_channel_messages = channels.channel_messages;
+ update.latest_channel_buffer_versions = channels.latest_buffer_versions;
+ update.latest_channel_message_ids = channels.latest_channel_messages;
for (channel_id, participants) in channels.channel_participants {
update
@@ -181,7 +181,9 @@ message Envelope {
MarkNotificationRead mark_notification_read = 153;
LspExtExpandMacro lsp_ext_expand_macro = 154;
LspExtExpandMacroResponse lsp_ext_expand_macro_response = 155;
- SetRoomParticipantRole set_room_participant_role = 156; // Current max
+ SetRoomParticipantRole set_room_participant_role = 156;
+
+ UpdateUserChannels update_user_channels = 157; // current max
}
}
@@ -992,19 +994,18 @@ message UpdateChannels {
repeated Channel channel_invitations = 5;
repeated uint64 remove_channel_invitations = 6;
repeated ChannelParticipants channel_participants = 7;
- repeated UnseenChannelMessage unseen_channel_messages = 9;
- repeated UnseenChannelBufferChange unseen_channel_buffer_changes = 10;
+ repeated ChannelMessageId latest_channel_message_ids = 8;
+ repeated ChannelBufferVersion latest_channel_buffer_versions = 9;
}
-message UnseenChannelMessage {
- uint64 channel_id = 1;
- uint64 message_id = 2;
+message UpdateUserChannels {
+ repeated ChannelMessageId observed_channel_message_id = 1;
+ repeated ChannelBufferVersion observed_channel_buffer_version = 2;
}
-message UnseenChannelBufferChange {
+message ChannelMessageId {
uint64 channel_id = 1;
- uint64 epoch = 2;
- repeated VectorClockEntry version = 3;
+ uint64 message_id = 2;
}
message ChannelPermission {
@@ -269,6 +269,7 @@ messages!(
(UpdateChannelBuffer, Foreground),
(UpdateChannelBufferCollaborators, Foreground),
(UpdateChannels, Foreground),
+ (UpdateUserChannels, Foreground),
(UpdateContacts, Foreground),
(UpdateDiagnosticSummary, Foreground),
(UpdateDiffBase, Foreground),
@@ -11,4 +11,4 @@ pub use notification::*;
pub use peer::*;
mod macros;
-pub const PROTOCOL_VERSION: u32 = 67;
+pub const PROTOCOL_VERSION: u32 = 68;