wip: oh yeah, this might actually be good

Phillip Davis created

Change summary

xmpp/Cargo.toml                                     |   2 
xmpp/src/component.rs                               | 249 --------------
xmpp/src/component/mod.rs                           |  77 ++++
xmpp/src/component/resource/component_connection.rs |  26 +
xmpp/src/component/resource/mod.rs                  |   1 
xmpp/src/component/system.rs                        |  30 +
xmpp/src/component/task.rs                          |  25 +
7 files changed, 161 insertions(+), 249 deletions(-)

Detailed changes

xmpp/Cargo.toml 🔗

@@ -23,6 +23,8 @@ tokio-util = { version = "0.7", features = ["codec"] }
 # same repository dependencies
 tokio-xmpp = { version = "5.0", features = ["component", "insecure-tcp"], path = "../tokio-xmpp", default-features = false }
 thiserror = "2.0"
+bevy_ecs = "0.18.0"
+bevy_app = "0.18.0"
 
 [dev-dependencies]
 env_logger = { version = "0.11", default-features = false, features = ["auto-color", "humantime"] }

xmpp/src/component.rs 🔗

@@ -1,249 +0,0 @@
-use futures::stream::StreamExt;
-use std::{fmt::Debug, marker::PhantomData};
-
-use thiserror::Error;
-use tokio_xmpp::{
-    Component as TokioComponent, Stanza,
-    connect::ServerConnector,
-    jid::{BareJid, Jid},
-    parsers::{
-        data_forms::DataForm,
-        ibr::{FormQuery, LegacyQuery},
-        iq::Iq,
-        ns,
-        stanza_error::{DefinedCondition, ErrorType, StanzaError},
-    },
-    xmlstream::Timeouts,
-};
-
-pub trait IbrData: Clone + Send + Sync + Sized {}
-
-pub trait LegacyIbrData: Clone + Send + Sync + Sized + IbrData {
-    fn from_legacy(query: &LegacyQuery) -> Self;
-}
-
-pub trait DataFormIbrData: Clone + Send + Sync + Sized + IbrData {
-    fn from_data_form(form: &FormQuery) -> Self;
-}
-
-pub trait IbrStore: Send + Sync {
-    type Data: IbrData;
-
-    type E: std::error::Error + Send + Sync;
-
-    fn get(&self, jid: &BareJid) -> impl Future<Output = Result<Option<Self::Data>, Self::E>>;
-
-    fn register(
-        &self,
-        jid: &BareJid,
-        data: Self::Data,
-    ) -> impl Future<Output = Result<(), Self::E>>;
-
-    fn unregister(
-        &self,
-        jid: &BareJid,
-        data: Self::Data,
-    ) -> impl Future<Output = Result<(), Self::E>>;
-}
-
-// Must be empty
-pub enum IbrFields {
-    Legacy(LegacyQuery),
-    DataForm(DataForm),
-}
-
-// Must not be empty
-pub enum IbrSubmission {
-    Legacy(LegacyQuery),
-    DataForm(DataForm),
-}
-
-/// Errors that can occur during IBR operations.                                             
-#[derive(Debug)]
-pub enum IbrError {
-    /// User is not registered (for cancel/change operations).                               
-    NotRegistered,
-    /// Username conflict.                                                                   
-    Conflict,
-    /// Required fields missing.                                                             
-    NotAcceptable,
-    /// Unsupported submission type (e.g., legacy when only DataForm is implemented).        
-    UnsupportedSubmissionType,
-    /// Storage error.                                                                       
-    Store(Box<dyn std::error::Error + Send + Sync>),
-}
-
-pub trait IbrHandler: IbrStore {
-    fn fields(
-        &self,
-        from: &BareJid,
-        status: Option<Self::Data>,
-    ) -> impl Future<Output = IbrFields> + Send;
-
-    fn on_register(
-        &self,
-        from: &BareJid,
-        submission: IbrSubmission,
-    ) -> impl Future<Output = Result<(), IbrError>>;
-
-    fn on_cancel(&self, from: &BareJid) -> impl Future<Output = Result<(), IbrError>> + Send;
-}
-
-impl IbrData for () {}
-
-#[derive(Debug, Error)]
-pub enum IbrNotSupportedShouldBeUnreachableError {
-    #[error("If you see this, there is a serious bug in xmpp-rs. Please report")]
-    CriticalUnreachableError,
-}
-
-pub struct IbrNotSupported {}
-
-impl IbrStore for IbrNotSupported {
-    type Data = ();
-    type E = IbrNotSupportedShouldBeUnreachableError;
-
-    fn get(&self, _: &BareJid) -> impl Future<Output = Result<Option<Self::Data>, Self::E>> {
-        unreachable!();
-        std::future::ready(Err(
-            IbrNotSupportedShouldBeUnreachableError::CriticalUnreachableError,
-        ))
-    }
-
-    fn register(&self, _: &BareJid, _: Self::Data) -> impl Future<Output = Result<(), Self::E>> {
-        unreachable!();
-        std::future::ready(Err(
-            IbrNotSupportedShouldBeUnreachableError::CriticalUnreachableError,
-        ))
-    }
-
-    fn unregister(&self, _: &BareJid, _: Self::Data) -> impl Future<Output = Result<(), Self::E>> {
-        unreachable!();
-        std::future::ready(Err(
-            IbrNotSupportedShouldBeUnreachableError::CriticalUnreachableError,
-        ))
-    }
-}
-
-impl IbrHandler for IbrNotSupported {}
-
-#[derive(Debug)]
-pub enum ComponentError {
-    IbrError(IbrError),
-}
-
-struct Config<C, Handler>
-where
-    C: ServerConnector,
-    Handler: ComponentHandler<C>,
-{
-    jid: Jid,
-    password: String,
-    hander: Handler,
-    phantom_connector: PhantomData<C>,
-}
-
-pub enum Continuation {
-    Next(Stanza),
-    Error(StanzaError),
-}
-
-pub trait ComponentHandler<C>: Sized
-where
-    C: ServerConnector,
-{
-    fn handle_register() -> impl Future<Output = Result<(), ComponentError>>;
-
-    async fn handle_iq(&mut self, iq: Iq) -> Result<Continuation, ComponentError> {
-        match iq {
-            Iq::Get {
-                from,
-                to,
-                id,
-                payload,
-            } => match (payload.name(), payload.ns().as_str()) {
-                ("query", ns::REGISTER) => {
-                    let fields = self.ibr_fields().await;
-                    todo!()
-                }
-                _ => {
-                    let err = StanzaError::new(
-                        ErrorType::Cancel,
-                        DefinedCondition::ServiceUnavailable,
-                        "en",
-                        "No handler defined for this kind of iq.",
-                    );
-
-                    Ok(Continuation::Error(err))
-                }
-            },
-
-            _ => todo!(),
-        }
-    }
-}
-
-struct Component<C, Handler, Ibr>
-where
-    C: ServerConnector,
-    Handler: ComponentHandler<C>,
-    Ibr: IbrHandler,
-{
-    jid: Jid,
-    password: String,
-    handler: Handler,
-    inner: TokioComponent<C>, // low-level component implementation
-    ibr: Ibr,
-}
-
-impl<C, Handler> Component<C, Handler, Ibr>
-where
-    C: ServerConnector,
-    Handler: ComponentHandler<C>,
-    Ibr: IbrHandler,
-{
-    async fn run(mut self) -> Result<(), ComponentError> {
-        while let Some(stanza) = self.inner.next().await {
-            match stanza {
-                Stanza::Iq(iq) => {
-                    self.handler.handle_iq(iq);
-                }
-                _ => todo!(),
-            }
-        }
-
-        Ok(())
-    }
-}
-
-impl<C, Handler> Config<C, Handler, Ibr>
-where
-    C: ServerConnector,
-    Handler: ComponentHandler<C>,
-    Ibr: IbrHandler,
-{
-    pub async fn run(
-        self,
-        connector: C,
-        timeouts: Timeouts,
-        handler: Handler,
-    ) -> Result<(), ComponentError> {
-        let inner_component = TokioComponent::new_with_connector(
-            &self.jid.as_str(),
-            &self.password.as_str(),
-            connector,
-            timeouts,
-        )
-        .await
-        .unwrap(); // TODO: Handle this error
-
-        let inner_component = Component {
-            jid: self.jid,
-            password: self.password,
-            handler,
-            inner: inner_component,
-        };
-
-        inner_component.run().await
-    }
-}

xmpp/src/component/mod.rs 🔗

@@ -0,0 +1,77 @@
+use bevy_app::{App, Plugin, Update};
+use bevy_ecs::resource::Resource;
+use futures::StreamExt;
+use tokio::runtime::Runtime;
+use tokio::sync::mpsc;
+use tokio_xmpp::{Component as TokioComponent, connect::TcpServerConnector, jid::BareJid};
+
+pub mod resource;
+mod system;
+mod task;
+
+pub use resource::component_connection::{ComponentConfig, ComponentConnection, IncomingStanza};
+pub use system::receive_stanzas;
+
+#[derive(Resource)]
+pub struct AsyncRuntime(pub Runtime);
+
+impl From<Runtime> for AsyncRuntime {
+    fn from(value: Runtime) -> Self {
+        Self(value)
+    }
+}
+
+pub struct ComponentPlugin;
+
+impl Plugin for ComponentPlugin {
+    fn build(&self, app: &mut App) {
+        app.init_resource::<ComponentConfig>()
+            .add_message::<IncomingStanza>()
+            .add_systems(Update, receive_stanzas);
+    }
+}
+
+pub fn setup_component_connection(
+    runtime: &Runtime,
+    component: TokioComponent<TcpServerConnector>,
+    buffer_size: usize,
+) -> ComponentConnection {
+    let (incoming_tx, incoming_rx) = mpsc::channel(buffer_size);
+    let (outgoing_tx, outgoing_rx) = mpsc::channel(buffer_size);
+
+    let jid: BareJid = component.jid.clone().into_bare();
+    let (sink, stream) = component.split();
+
+    runtime.spawn(task::incoming_stanza_task(stream, incoming_tx));
+    runtime.spawn(task::outgoing_stanza_task(sink, outgoing_rx));
+
+    ComponentConnection {
+        jid,
+        incoming_rx,
+        outgoing_tx,
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use bevy_app::App;
+
+    #[test]
+    fn api_sketching() {
+        let runtime = Runtime::new().unwrap();
+        println!("about to connect");
+        let component = runtime
+            .block_on(TokioComponent::new("sgxbwmsgsv2.localhost", "secret"))
+            .unwrap();
+        let conn = setup_component_connection(&runtime, component, 256);
+
+        println!("finished connecting");
+
+        App::new()
+            .insert_resource(conn)
+            .insert_resource(AsyncRuntime::from(runtime))
+            .add_plugins(ComponentPlugin)
+            .run();
+    }
+}

xmpp/src/component/resource/component_connection.rs 🔗

@@ -0,0 +1,26 @@
+use bevy_ecs::{prelude::Message, resource::Resource};
+use tokio::sync::mpsc;
+use tokio_xmpp::{Stanza, jid::BareJid};
+
+#[derive(Resource)]
+pub struct ComponentConnection {
+    pub jid: BareJid,
+    pub incoming_rx: mpsc::Receiver<Stanza>,
+    pub outgoing_tx: mpsc::Sender<Stanza>,
+}
+
+#[derive(Message)]
+pub struct IncomingStanza(pub Stanza);
+
+#[derive(Resource, Clone)]
+pub struct ComponentConfig {
+    pub max_stanzas_per_tick: usize,
+}
+
+impl Default for ComponentConfig {
+    fn default() -> Self {
+        Self {
+            max_stanzas_per_tick: 100,
+        }
+    }
+}

xmpp/src/component/system.rs 🔗

@@ -0,0 +1,30 @@
+use bevy_ecs::prelude::*;
+use futures::stream::Empty;
+use tokio::sync::mpsc::error::TryRecvError;
+
+use crate::component::resource::component_connection::{
+    ComponentConfig, ComponentConnection, IncomingStanza,
+};
+
+pub fn receive_stanzas(
+    mut conn: ResMut<ComponentConnection>,
+    mut messages: MessageWriter<IncomingStanza>,
+    config: Res<ComponentConfig>,
+) {
+    let mut count = 0;
+    while count < config.max_stanzas_per_tick {
+        println!("About to pull another stanza off the stack!");
+        match conn.incoming_rx.try_recv() {
+            Ok(stanza) => {
+                messages.write(IncomingStanza(stanza));
+                count += 1;
+            }
+            Err(TryRecvError::Empty) => {
+                info!("Empty incoming stream!")
+            }
+            Err(TryRecvError::Disconnected) => {
+                panic!()
+            }
+        }
+    }
+}

xmpp/src/component/task.rs 🔗

@@ -0,0 +1,25 @@
+use futures::{Sink, SinkExt, Stream, StreamExt};
+use tokio::sync::mpsc;
+use tokio_xmpp::{Error, Stanza};
+
+pub async fn incoming_stanza_task(
+    mut stream: impl Stream<Item = Stanza> + Unpin,
+    tx: mpsc::Sender<Stanza>,
+) {
+    while let Some(stanza) = stream.next().await {
+        if tx.send(stanza).await.is_err() {
+            break;
+        }
+    }
+}
+
+pub async fn outgoing_stanza_task(
+    mut sink: impl Sink<Stanza, Error = Error> + Unpin,
+    mut rx: mpsc::Receiver<Stanza>,
+) {
+    while let Some(stanza) = rx.recv().await {
+        if sink.send(stanza).await.is_err() {
+            break;
+        }
+    }
+}