From e2f2d4a8fe90cec53724a70ea64d3530708adf6c Mon Sep 17 00:00:00 2001 From: Phillip Davis Date: Sun, 18 Jan 2026 17:12:25 -0500 Subject: [PATCH] wip: oh yeah, this might actually be good --- xmpp/Cargo.toml | 2 + xmpp/src/component.rs | 249 ------------------ xmpp/src/component/mod.rs | 77 ++++++ .../resource/component_connection.rs | 26 ++ xmpp/src/component/resource/mod.rs | 1 + xmpp/src/component/system.rs | 30 +++ xmpp/src/component/task.rs | 25 ++ 7 files changed, 161 insertions(+), 249 deletions(-) delete mode 100644 xmpp/src/component.rs create mode 100644 xmpp/src/component/mod.rs create mode 100644 xmpp/src/component/resource/component_connection.rs create mode 100644 xmpp/src/component/resource/mod.rs create mode 100644 xmpp/src/component/system.rs create mode 100644 xmpp/src/component/task.rs diff --git a/xmpp/Cargo.toml b/xmpp/Cargo.toml index 2bcc78c440ed380d7494e2fdad9a819b3e13d3a5..98c00936c4fdd31232b836814a26e78d07cffc5e 100644 --- a/xmpp/Cargo.toml +++ b/xmpp/Cargo.toml @@ -23,6 +23,8 @@ tokio-util = { version = "0.7", features = ["codec"] } # same repository dependencies tokio-xmpp = { version = "5.0", features = ["component", "insecure-tcp"], path = "../tokio-xmpp", default-features = false } thiserror = "2.0" +bevy_ecs = "0.18.0" +bevy_app = "0.18.0" [dev-dependencies] env_logger = { version = "0.11", default-features = false, features = ["auto-color", "humantime"] } diff --git a/xmpp/src/component.rs b/xmpp/src/component.rs deleted file mode 100644 index 6bff3400e34db8193ec841992d746c153f000b40..0000000000000000000000000000000000000000 --- a/xmpp/src/component.rs +++ /dev/null @@ -1,249 +0,0 @@ -use futures::stream::StreamExt; -use std::{fmt::Debug, marker::PhantomData}; - -use thiserror::Error; -use tokio_xmpp::{ - Component as TokioComponent, Stanza, - connect::ServerConnector, - jid::{BareJid, Jid}, - parsers::{ - data_forms::DataForm, - ibr::{FormQuery, LegacyQuery}, - iq::Iq, - ns, - stanza_error::{DefinedCondition, ErrorType, StanzaError}, - }, - xmlstream::Timeouts, -}; - -pub trait IbrData: Clone + Send + Sync + Sized {} - -pub trait LegacyIbrData: Clone + Send + Sync + Sized + IbrData { - fn from_legacy(query: &LegacyQuery) -> Self; -} - -pub trait DataFormIbrData: Clone + Send + Sync + Sized + IbrData { - fn from_data_form(form: &FormQuery) -> Self; -} - -pub trait IbrStore: Send + Sync { - type Data: IbrData; - - type E: std::error::Error + Send + Sync; - - fn get(&self, jid: &BareJid) -> impl Future, Self::E>>; - - fn register( - &self, - jid: &BareJid, - data: Self::Data, - ) -> impl Future>; - - fn unregister( - &self, - jid: &BareJid, - data: Self::Data, - ) -> impl Future>; -} - -// Must be empty -pub enum IbrFields { - Legacy(LegacyQuery), - DataForm(DataForm), -} - -// Must not be empty -pub enum IbrSubmission { - Legacy(LegacyQuery), - DataForm(DataForm), -} - -/// Errors that can occur during IBR operations. -#[derive(Debug)] -pub enum IbrError { - /// User is not registered (for cancel/change operations). - NotRegistered, - /// Username conflict. - Conflict, - /// Required fields missing. - NotAcceptable, - /// Unsupported submission type (e.g., legacy when only DataForm is implemented). - UnsupportedSubmissionType, - /// Storage error. - Store(Box), -} - -pub trait IbrHandler: IbrStore { - fn fields( - &self, - from: &BareJid, - status: Option, - ) -> impl Future + Send; - - fn on_register( - &self, - from: &BareJid, - submission: IbrSubmission, - ) -> impl Future>; - - fn on_cancel(&self, from: &BareJid) -> impl Future> + Send; -} - -impl IbrData for () {} - -#[derive(Debug, Error)] -pub enum IbrNotSupportedShouldBeUnreachableError { - #[error("If you see this, there is a serious bug in xmpp-rs. Please report")] - CriticalUnreachableError, -} - -pub struct IbrNotSupported {} - -impl IbrStore for IbrNotSupported { - type Data = (); - type E = IbrNotSupportedShouldBeUnreachableError; - - fn get(&self, _: &BareJid) -> impl Future, Self::E>> { - unreachable!(); - std::future::ready(Err( - IbrNotSupportedShouldBeUnreachableError::CriticalUnreachableError, - )) - } - - fn register(&self, _: &BareJid, _: Self::Data) -> impl Future> { - unreachable!(); - std::future::ready(Err( - IbrNotSupportedShouldBeUnreachableError::CriticalUnreachableError, - )) - } - - fn unregister(&self, _: &BareJid, _: Self::Data) -> impl Future> { - unreachable!(); - std::future::ready(Err( - IbrNotSupportedShouldBeUnreachableError::CriticalUnreachableError, - )) - } -} - -impl IbrHandler for IbrNotSupported {} - -#[derive(Debug)] -pub enum ComponentError { - IbrError(IbrError), -} - -struct Config -where - C: ServerConnector, - Handler: ComponentHandler, -{ - jid: Jid, - password: String, - hander: Handler, - phantom_connector: PhantomData, -} - -pub enum Continuation { - Next(Stanza), - Error(StanzaError), -} - -pub trait ComponentHandler: Sized -where - C: ServerConnector, -{ - fn handle_register() -> impl Future>; - - async fn handle_iq(&mut self, iq: Iq) -> Result { - match iq { - Iq::Get { - from, - to, - id, - payload, - } => match (payload.name(), payload.ns().as_str()) { - ("query", ns::REGISTER) => { - let fields = self.ibr_fields().await; - todo!() - } - _ => { - let err = StanzaError::new( - ErrorType::Cancel, - DefinedCondition::ServiceUnavailable, - "en", - "No handler defined for this kind of iq.", - ); - - Ok(Continuation::Error(err)) - } - }, - - _ => todo!(), - } - } -} - -struct Component -where - C: ServerConnector, - Handler: ComponentHandler, - Ibr: IbrHandler, -{ - jid: Jid, - password: String, - handler: Handler, - inner: TokioComponent, // low-level component implementation - ibr: Ibr, -} - -impl Component -where - C: ServerConnector, - Handler: ComponentHandler, - Ibr: IbrHandler, -{ - async fn run(mut self) -> Result<(), ComponentError> { - while let Some(stanza) = self.inner.next().await { - match stanza { - Stanza::Iq(iq) => { - self.handler.handle_iq(iq); - } - _ => todo!(), - } - } - - Ok(()) - } -} - -impl Config -where - C: ServerConnector, - Handler: ComponentHandler, - Ibr: IbrHandler, -{ - pub async fn run( - self, - connector: C, - timeouts: Timeouts, - handler: Handler, - ) -> Result<(), ComponentError> { - let inner_component = TokioComponent::new_with_connector( - &self.jid.as_str(), - &self.password.as_str(), - connector, - timeouts, - ) - .await - .unwrap(); // TODO: Handle this error - - let inner_component = Component { - jid: self.jid, - password: self.password, - handler, - inner: inner_component, - }; - - inner_component.run().await - } -} diff --git a/xmpp/src/component/mod.rs b/xmpp/src/component/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..75d47d9110399ae0f0be491be8cd6792ad4517cf --- /dev/null +++ b/xmpp/src/component/mod.rs @@ -0,0 +1,77 @@ +use bevy_app::{App, Plugin, Update}; +use bevy_ecs::resource::Resource; +use futures::StreamExt; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; +use tokio_xmpp::{Component as TokioComponent, connect::TcpServerConnector, jid::BareJid}; + +pub mod resource; +mod system; +mod task; + +pub use resource::component_connection::{ComponentConfig, ComponentConnection, IncomingStanza}; +pub use system::receive_stanzas; + +#[derive(Resource)] +pub struct AsyncRuntime(pub Runtime); + +impl From for AsyncRuntime { + fn from(value: Runtime) -> Self { + Self(value) + } +} + +pub struct ComponentPlugin; + +impl Plugin for ComponentPlugin { + fn build(&self, app: &mut App) { + app.init_resource::() + .add_message::() + .add_systems(Update, receive_stanzas); + } +} + +pub fn setup_component_connection( + runtime: &Runtime, + component: TokioComponent, + buffer_size: usize, +) -> ComponentConnection { + let (incoming_tx, incoming_rx) = mpsc::channel(buffer_size); + let (outgoing_tx, outgoing_rx) = mpsc::channel(buffer_size); + + let jid: BareJid = component.jid.clone().into_bare(); + let (sink, stream) = component.split(); + + runtime.spawn(task::incoming_stanza_task(stream, incoming_tx)); + runtime.spawn(task::outgoing_stanza_task(sink, outgoing_rx)); + + ComponentConnection { + jid, + incoming_rx, + outgoing_tx, + } +} + +#[cfg(test)] +mod test { + use super::*; + use bevy_app::App; + + #[test] + fn api_sketching() { + let runtime = Runtime::new().unwrap(); + println!("about to connect"); + let component = runtime + .block_on(TokioComponent::new("sgxbwmsgsv2.localhost", "secret")) + .unwrap(); + let conn = setup_component_connection(&runtime, component, 256); + + println!("finished connecting"); + + App::new() + .insert_resource(conn) + .insert_resource(AsyncRuntime::from(runtime)) + .add_plugins(ComponentPlugin) + .run(); + } +} diff --git a/xmpp/src/component/resource/component_connection.rs b/xmpp/src/component/resource/component_connection.rs new file mode 100644 index 0000000000000000000000000000000000000000..9350cd24af55b269b82906218b12ab951deaf765 --- /dev/null +++ b/xmpp/src/component/resource/component_connection.rs @@ -0,0 +1,26 @@ +use bevy_ecs::{prelude::Message, resource::Resource}; +use tokio::sync::mpsc; +use tokio_xmpp::{Stanza, jid::BareJid}; + +#[derive(Resource)] +pub struct ComponentConnection { + pub jid: BareJid, + pub incoming_rx: mpsc::Receiver, + pub outgoing_tx: mpsc::Sender, +} + +#[derive(Message)] +pub struct IncomingStanza(pub Stanza); + +#[derive(Resource, Clone)] +pub struct ComponentConfig { + pub max_stanzas_per_tick: usize, +} + +impl Default for ComponentConfig { + fn default() -> Self { + Self { + max_stanzas_per_tick: 100, + } + } +} diff --git a/xmpp/src/component/resource/mod.rs b/xmpp/src/component/resource/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..bfa2d04648061589a7adf99124865e1c7fa6cde8 --- /dev/null +++ b/xmpp/src/component/resource/mod.rs @@ -0,0 +1 @@ +pub mod component_connection; diff --git a/xmpp/src/component/system.rs b/xmpp/src/component/system.rs new file mode 100644 index 0000000000000000000000000000000000000000..96657f001dddf3dab78ec8e38ef939f58698ccd5 --- /dev/null +++ b/xmpp/src/component/system.rs @@ -0,0 +1,30 @@ +use bevy_ecs::prelude::*; +use futures::stream::Empty; +use tokio::sync::mpsc::error::TryRecvError; + +use crate::component::resource::component_connection::{ + ComponentConfig, ComponentConnection, IncomingStanza, +}; + +pub fn receive_stanzas( + mut conn: ResMut, + mut messages: MessageWriter, + config: Res, +) { + let mut count = 0; + while count < config.max_stanzas_per_tick { + println!("About to pull another stanza off the stack!"); + match conn.incoming_rx.try_recv() { + Ok(stanza) => { + messages.write(IncomingStanza(stanza)); + count += 1; + } + Err(TryRecvError::Empty) => { + info!("Empty incoming stream!") + } + Err(TryRecvError::Disconnected) => { + panic!() + } + } + } +} diff --git a/xmpp/src/component/task.rs b/xmpp/src/component/task.rs new file mode 100644 index 0000000000000000000000000000000000000000..843e65a69c04f92b93fb21b6dab25c8cf723bb7e --- /dev/null +++ b/xmpp/src/component/task.rs @@ -0,0 +1,25 @@ +use futures::{Sink, SinkExt, Stream, StreamExt}; +use tokio::sync::mpsc; +use tokio_xmpp::{Error, Stanza}; + +pub async fn incoming_stanza_task( + mut stream: impl Stream + Unpin, + tx: mpsc::Sender, +) { + while let Some(stanza) = stream.next().await { + if tx.send(stanza).await.is_err() { + break; + } + } +} + +pub async fn outgoing_stanza_task( + mut sink: impl Sink + Unpin, + mut rx: mpsc::Receiver, +) { + while let Some(stanza) = rx.recv().await { + if sink.send(stanza).await.is_err() { + break; + } + } +}