Change summary
crates/client/src/client.rs | 2 ++
crates/rpc/src/peer.rs | 21 +++++++++++++++++++++
crates/rpc/src/proto.rs | 10 ++++++++++
3 files changed, 33 insertions(+)
Detailed changes
@@ -1235,6 +1235,7 @@ impl Client {
subscriber
} else {
log::info!("unhandled message {}", type_name);
+ self.peer.respond_with_unhandled_message(message).log_err();
return;
};
@@ -1278,6 +1279,7 @@ impl Client {
.detach();
} else {
log::info!("unhandled message {}", type_name);
+ self.peer.respond_with_unhandled_message(message).log_err();
}
}
@@ -494,6 +494,27 @@ impl Peer {
Ok(())
}
+ pub fn respond_with_unhandled_message(
+ &self,
+ envelope: Box<dyn AnyTypedEnvelope>,
+ ) -> Result<()> {
+ let connection = self.connection_state(envelope.sender_id())?;
+ let response = proto::Error {
+ message: format!("message {} was not handled", envelope.payload_type_name()),
+ };
+ let message_id = connection
+ .next_message_id
+ .fetch_add(1, atomic::Ordering::SeqCst);
+ connection
+ .outgoing_tx
+ .unbounded_send(proto::Message::Envelope(response.into_envelope(
+ message_id,
+ Some(envelope.message_id()),
+ None,
+ )))?;
+ Ok(())
+ }
+
fn connection_state(&self, connection_id: ConnectionId) -> Result<ConnectionState> {
let connections = self.connections.read();
let connection = connections
@@ -42,6 +42,8 @@ pub trait AnyTypedEnvelope: 'static + Send + Sync {
fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
fn is_background(&self) -> bool;
fn original_sender_id(&self) -> Option<PeerId>;
+ fn sender_id(&self) -> ConnectionId;
+ fn message_id(&self) -> u32;
}
pub enum MessagePriority {
@@ -73,6 +75,14 @@ impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
fn original_sender_id(&self) -> Option<PeerId> {
self.original_sender_id
}
+
+ fn sender_id(&self) -> ConnectionId {
+ self.sender_id
+ }
+
+ fn message_id(&self) -> u32 {
+ self.message_id
+ }
}
impl PeerId {