diff --git a/xmpp/src/component/mod.rs b/xmpp/src/component/mod.rs index 21b3792ef68c867431a487367e03c9bf3f711dce..f1494c9b78b49a7455199330be1d48eccee63f66 100644 --- a/xmpp/src/component/mod.rs +++ b/xmpp/src/component/mod.rs @@ -1,18 +1,45 @@ pub mod resource; +mod stanza_matcher; mod system; +pub use stanza_matcher::{ + HCons, HNil, Matcher, MatcherRegistry, StanzaDispatch, StanzaMatcher, StanzaMatcherPlugin, +}; + #[cfg(test)] mod test { - use crate::component::resource::{ComponentConfig, IncomingStanza, XmppComponent}; - use super::*; - use bevy_app::{App, RunMode, ScheduleRunnerPlugin, Startup, Update}; - use bevy_ecs::message::MessageReader; + use crate::component::resource::{ComponentConfig, XmppComponent}; + use bevy_app::{App, RunMode, ScheduleRunnerPlugin}; + use bevy_ecs::message::{Message, MessageReader}; use bevy_tokio_tasks::TokioTasksPlugin; + use tokio_xmpp::Stanza; + + #[derive(Message)] + struct TestMessage { + content: String, + } + + #[derive(Clone)] + struct TestMatcher; + + impl StanzaMatcher for TestMatcher { + type Message = TestMessage; + + fn matches(&mut self, candidate: &Stanza) -> Option { + match candidate { + Stanza::Message(msg) => { + let body = msg.bodies.get("")?.clone(); + Some(TestMessage { content: body }) + } + _ => None, + } + } + } - fn log_incoming_stanzas(mut stanzas: MessageReader) { - for IncomingStanza(s) in stanzas.read() { - println!("Received stanza: {s:?}"); + fn log_test_messages(mut messages: MessageReader) { + for msg in messages.read() { + println!("Received message: {}", msg.content); } } @@ -22,16 +49,13 @@ mod test { App::new() .add_plugins(ScheduleRunnerPlugin { - run_mode: RunMode::Loop { - wait: None - } - }) + run_mode: RunMode::Loop { wait: None }, + }) .add_plugins(TokioTasksPlugin::default()) - .add_message::() + .add_plugins(StanzaMatcherPlugin::new().matcher(TestMatcher)) .init_resource::() .init_resource::() - .add_systems(Startup, system::spawn_stanza_workers) - .add_systems(Update, log_incoming_stanzas) + .add_systems(bevy_app::Update, log_test_messages) .run(); } } diff --git a/xmpp/src/component/resource/component_connection.rs b/xmpp/src/component/resource/component_connection.rs index 9350cd24af55b269b82906218b12ab951deaf765..b5d0d0e1b5000ea3e1ad3a2a82d22d4b6a5ffd83 100644 --- a/xmpp/src/component/resource/component_connection.rs +++ b/xmpp/src/component/resource/component_connection.rs @@ -2,13 +2,6 @@ use bevy_ecs::{prelude::Message, resource::Resource}; use tokio::sync::mpsc; use tokio_xmpp::{Stanza, jid::BareJid}; -#[derive(Resource)] -pub struct ComponentConnection { - pub jid: BareJid, - pub incoming_rx: mpsc::Receiver, - pub outgoing_tx: mpsc::Sender, -} - #[derive(Message)] pub struct IncomingStanza(pub Stanza); diff --git a/xmpp/src/component/stanza_matcher.rs b/xmpp/src/component/stanza_matcher.rs new file mode 100644 index 0000000000000000000000000000000000000000..c156083c4359f48c644201f6e2714ea9e025e13b --- /dev/null +++ b/xmpp/src/component/stanza_matcher.rs @@ -0,0 +1,156 @@ +use bevy_app::{App, Plugin, Startup}; +use bevy_ecs::message::Message; +use bevy_ecs::prelude::Messages; +use bevy_ecs::resource::Resource; +use bevy_ecs::world::World; +use tokio_xmpp::Stanza; + +use crate::component::system::spawn_stanza_workers; + +pub trait StanzaMatcher: Send + Sync + 'static { + type Message: Message; + fn matches(&mut self, candidate: &Stanza) -> Option; +} + +// ============================================================================= +// HList Structure +// ============================================================================= + +pub struct HNil; + +pub struct HCons { + pub head: Head, + pub tail: Tail, +} + +pub struct Matcher(pub M); + +// ============================================================================= +// Dispatch Trait +// ============================================================================= + +pub trait StanzaDispatch: Send + Sync + 'static { + fn dispatch(&mut self, stanza: &Stanza, world: &mut World); + fn register(app: &mut App); +} + +impl StanzaDispatch for HNil { + fn dispatch(&mut self, _stanza: &Stanza, _world: &mut World) {} + fn register(_app: &mut App) {} +} + +impl StanzaDispatch for HCons, Tail> +where + M: StanzaMatcher, + Tail: StanzaDispatch, +{ + fn dispatch(&mut self, stanza: &Stanza, world: &mut World) { + if let Some(msg) = self.head.0.matches(stanza) { + world.resource_mut::>().write(msg); + return; + } + self.tail.dispatch(stanza, world) + } + + fn register(app: &mut App) { + app.add_message::(); + Tail::register(app); + } +} + +// ============================================================================= +// Builder API +// ============================================================================= + +impl HNil { + pub fn matcher(self, m: M) -> HCons, HNil> { + HCons { + head: Matcher(m), + tail: HNil, + } + } +} + +impl HCons { + pub fn matcher(self, m: M) -> HCons, Self> { + HCons { + head: Matcher(m), + tail: self, + } + } +} + +// ============================================================================= +// Resource Wrapper +// ============================================================================= + +#[derive(Resource)] +pub struct MatcherRegistry { + pub dispatchers: D, +} + +// ============================================================================= +// Plugin +// ============================================================================= + +pub struct StanzaMatcherPlugin { + dispatchers: D, +} + +impl StanzaMatcherPlugin { + pub fn new() -> Self { + StanzaMatcherPlugin { dispatchers: HNil } + } +} + +impl Default for StanzaMatcherPlugin { + fn default() -> Self { + Self::new() + } +} + +impl StanzaMatcherPlugin { + pub fn matcher(self, m: M) -> StanzaMatcherPlugin, D>> { + StanzaMatcherPlugin { + dispatchers: HCons { + head: Matcher(m), + tail: self.dispatchers, + }, + } + } +} + +impl Plugin for StanzaMatcherPlugin { + fn build(&self, app: &mut App) { + D::register(app); + app.insert_resource(MatcherRegistry { + dispatchers: self.dispatchers.clone(), + }); + app.add_systems(Startup, spawn_stanza_workers::); + } +} + +// ============================================================================= +// Clone Impls +// ============================================================================= + +impl Clone for HNil { + fn clone(&self) -> Self { + HNil + } +} + +impl Clone for Matcher { + fn clone(&self) -> Self { + Matcher(self.0.clone()) + } +} + +impl Clone for HCons { + fn clone(&self) -> Self { + HCons { + head: self.head.clone(), + tail: self.tail.clone(), + } + } +} diff --git a/xmpp/src/component/system.rs b/xmpp/src/component/system.rs index 1a775b7b0a5c88c4f9b84bd7c55118139aa668e6..9c3bf80bbc97b2baf014867a7af63d977b43649b 100644 --- a/xmpp/src/component/system.rs +++ b/xmpp/src/component/system.rs @@ -1,11 +1,11 @@ -use bevy_ecs::prelude::Messages; use bevy_ecs::world::World; use bevy_tokio_tasks::TokioTasksRuntime; use futures::{SinkExt, StreamExt}; -use crate::component::resource::{ComponentConfig, IncomingStanza, XmppComponent}; +use crate::component::resource::{ComponentConfig, XmppComponent}; +use crate::component::stanza_matcher::{MatcherRegistry, StanzaDispatch}; -pub fn spawn_stanza_workers(world: &mut World) { +pub(crate) fn spawn_stanza_workers(world: &mut World) { let Some(xmpp) = world.remove_resource::() else { panic!("Expected XmppComponent resource to exist"); }; @@ -25,10 +25,14 @@ pub fn spawn_stanza_workers(world: &mut World) { while let Some(stanzas) = chunked.next().await { ctx.run_on_main_thread(move |main_ctx| { - main_ctx - .world - .resource_mut::>() - .write_batch(stanzas.into_iter().map(IncomingStanza)); + let world = &mut *main_ctx.world; + let mut registry = world + .remove_resource::>() + .expect("MatcherRegistry should exist"); + for stanza in stanzas { + registry.dispatchers.dispatch(&stanza, world); + } + world.insert_resource(registry); }) .await; }