Detailed changes
@@ -1,18 +1,45 @@
pub mod resource;
+mod stanza_matcher;
mod system;
+pub use stanza_matcher::{
+ HCons, HNil, Matcher, MatcherRegistry, StanzaDispatch, StanzaMatcher, StanzaMatcherPlugin,
+};
+
#[cfg(test)]
mod test {
- use crate::component::resource::{ComponentConfig, IncomingStanza, XmppComponent};
-
use super::*;
- use bevy_app::{App, RunMode, ScheduleRunnerPlugin, Startup, Update};
- use bevy_ecs::message::MessageReader;
+ use crate::component::resource::{ComponentConfig, XmppComponent};
+ use bevy_app::{App, RunMode, ScheduleRunnerPlugin};
+ use bevy_ecs::message::{Message, MessageReader};
use bevy_tokio_tasks::TokioTasksPlugin;
+ use tokio_xmpp::Stanza;
+
+ #[derive(Message)]
+ struct TestMessage {
+ content: String,
+ }
+
+ #[derive(Clone)]
+ struct TestMatcher;
+
+ impl StanzaMatcher for TestMatcher {
+ type Message = TestMessage;
+
+ fn matches(&mut self, candidate: &Stanza) -> Option<Self::Message> {
+ match candidate {
+ Stanza::Message(msg) => {
+ let body = msg.bodies.get("")?.clone();
+ Some(TestMessage { content: body })
+ }
+ _ => None,
+ }
+ }
+ }
- fn log_incoming_stanzas(mut stanzas: MessageReader<IncomingStanza>) {
- for IncomingStanza(s) in stanzas.read() {
- println!("Received stanza: {s:?}");
+ fn log_test_messages(mut messages: MessageReader<TestMessage>) {
+ for msg in messages.read() {
+ println!("Received message: {}", msg.content);
}
}
@@ -22,16 +49,13 @@ mod test {
App::new()
.add_plugins(ScheduleRunnerPlugin {
- run_mode: RunMode::Loop {
- wait: None
- }
- })
+ run_mode: RunMode::Loop { wait: None },
+ })
.add_plugins(TokioTasksPlugin::default())
- .add_message::<IncomingStanza>()
+ .add_plugins(StanzaMatcherPlugin::new().matcher(TestMatcher))
.init_resource::<ComponentConfig>()
.init_resource::<XmppComponent>()
- .add_systems(Startup, system::spawn_stanza_workers)
- .add_systems(Update, log_incoming_stanzas)
+ .add_systems(bevy_app::Update, log_test_messages)
.run();
}
}
@@ -2,13 +2,6 @@ use bevy_ecs::{prelude::Message, resource::Resource};
use tokio::sync::mpsc;
use tokio_xmpp::{Stanza, jid::BareJid};
-#[derive(Resource)]
-pub struct ComponentConnection {
- pub jid: BareJid,
- pub incoming_rx: mpsc::Receiver<Stanza>,
- pub outgoing_tx: mpsc::Sender<Stanza>,
-}
-
#[derive(Message)]
pub struct IncomingStanza(pub Stanza);
@@ -0,0 +1,156 @@
+use bevy_app::{App, Plugin, Startup};
+use bevy_ecs::message::Message;
+use bevy_ecs::prelude::Messages;
+use bevy_ecs::resource::Resource;
+use bevy_ecs::world::World;
+use tokio_xmpp::Stanza;
+
+use crate::component::system::spawn_stanza_workers;
+
+pub trait StanzaMatcher: Send + Sync + 'static {
+ type Message: Message;
+ fn matches(&mut self, candidate: &Stanza) -> Option<Self::Message>;
+}
+
+// =============================================================================
+// HList Structure
+// =============================================================================
+
+pub struct HNil;
+
+pub struct HCons<Head, Tail> {
+ pub head: Head,
+ pub tail: Tail,
+}
+
+pub struct Matcher<M>(pub M);
+
+// =============================================================================
+// Dispatch Trait
+// =============================================================================
+
+pub trait StanzaDispatch: Send + Sync + 'static {
+ fn dispatch(&mut self, stanza: &Stanza, world: &mut World);
+ fn register(app: &mut App);
+}
+
+impl StanzaDispatch for HNil {
+ fn dispatch(&mut self, _stanza: &Stanza, _world: &mut World) {}
+ fn register(_app: &mut App) {}
+}
+
+impl<M, Tail> StanzaDispatch for HCons<Matcher<M>, Tail>
+where
+ M: StanzaMatcher,
+ Tail: StanzaDispatch,
+{
+ fn dispatch(&mut self, stanza: &Stanza, world: &mut World) {
+ if let Some(msg) = self.head.0.matches(stanza) {
+ world.resource_mut::<Messages<M::Message>>().write(msg);
+ return;
+ }
+ self.tail.dispatch(stanza, world)
+ }
+
+ fn register(app: &mut App) {
+ app.add_message::<M::Message>();
+ Tail::register(app);
+ }
+}
+
+// =============================================================================
+// Builder API
+// =============================================================================
+
+impl HNil {
+ pub fn matcher<M: StanzaMatcher>(self, m: M) -> HCons<Matcher<M>, HNil> {
+ HCons {
+ head: Matcher(m),
+ tail: HNil,
+ }
+ }
+}
+
+impl<Head, Tail> HCons<Head, Tail> {
+ pub fn matcher<M: StanzaMatcher>(self, m: M) -> HCons<Matcher<M>, Self> {
+ HCons {
+ head: Matcher(m),
+ tail: self,
+ }
+ }
+}
+
+// =============================================================================
+// Resource Wrapper
+// =============================================================================
+
+#[derive(Resource)]
+pub struct MatcherRegistry<D: StanzaDispatch> {
+ pub dispatchers: D,
+}
+
+// =============================================================================
+// Plugin
+// =============================================================================
+
+pub struct StanzaMatcherPlugin<D> {
+ dispatchers: D,
+}
+
+impl StanzaMatcherPlugin<HNil> {
+ pub fn new() -> Self {
+ StanzaMatcherPlugin { dispatchers: HNil }
+ }
+}
+
+impl Default for StanzaMatcherPlugin<HNil> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl<D> StanzaMatcherPlugin<D> {
+ pub fn matcher<M: StanzaMatcher>(self, m: M) -> StanzaMatcherPlugin<HCons<Matcher<M>, D>> {
+ StanzaMatcherPlugin {
+ dispatchers: HCons {
+ head: Matcher(m),
+ tail: self.dispatchers,
+ },
+ }
+ }
+}
+
+impl<D: StanzaDispatch + Clone> Plugin for StanzaMatcherPlugin<D> {
+ fn build(&self, app: &mut App) {
+ D::register(app);
+ app.insert_resource(MatcherRegistry {
+ dispatchers: self.dispatchers.clone(),
+ });
+ app.add_systems(Startup, spawn_stanza_workers::<D>);
+ }
+}
+
+// =============================================================================
+// Clone Impls
+// =============================================================================
+
+impl Clone for HNil {
+ fn clone(&self) -> Self {
+ HNil
+ }
+}
+
+impl<M: Clone> Clone for Matcher<M> {
+ fn clone(&self) -> Self {
+ Matcher(self.0.clone())
+ }
+}
+
+impl<Head: Clone, Tail: Clone> Clone for HCons<Head, Tail> {
+ fn clone(&self) -> Self {
+ HCons {
+ head: self.head.clone(),
+ tail: self.tail.clone(),
+ }
+ }
+}
@@ -1,11 +1,11 @@
-use bevy_ecs::prelude::Messages;
use bevy_ecs::world::World;
use bevy_tokio_tasks::TokioTasksRuntime;
use futures::{SinkExt, StreamExt};
-use crate::component::resource::{ComponentConfig, IncomingStanza, XmppComponent};
+use crate::component::resource::{ComponentConfig, XmppComponent};
+use crate::component::stanza_matcher::{MatcherRegistry, StanzaDispatch};
-pub fn spawn_stanza_workers(world: &mut World) {
+pub(crate) fn spawn_stanza_workers<D: StanzaDispatch>(world: &mut World) {
let Some(xmpp) = world.remove_resource::<XmppComponent>() else {
panic!("Expected XmppComponent resource to exist");
};
@@ -25,10 +25,14 @@ pub fn spawn_stanza_workers(world: &mut World) {
while let Some(stanzas) = chunked.next().await {
ctx.run_on_main_thread(move |main_ctx| {
- main_ctx
- .world
- .resource_mut::<Messages<IncomingStanza>>()
- .write_batch(stanzas.into_iter().map(IncomingStanza));
+ let world = &mut *main_ctx.world;
+ let mut registry = world
+ .remove_resource::<MatcherRegistry<D>>()
+ .expect("MatcherRegistry should exist");
+ for stanza in stanzas {
+ registry.dispatchers.dispatch(&stanza, world);
+ }
+ world.insert_resource(registry);
})
.await;
}