diff --git a/xmpp/Cargo.toml b/xmpp/Cargo.toml index 98c00936c4fdd31232b836814a26e78d07cffc5e..0404c55023d7df26ce3ad198596e270bedca34b7 100644 --- a/xmpp/Cargo.toml +++ b/xmpp/Cargo.toml @@ -25,6 +25,7 @@ tokio-xmpp = { version = "5.0", features = ["component", "insecure-tcp"], path = thiserror = "2.0" bevy_ecs = "0.18.0" bevy_app = "0.18.0" +bevy-tokio-tasks = "0.18.0" [dev-dependencies] env_logger = { version = "0.11", default-features = false, features = ["auto-color", "humantime"] } diff --git a/xmpp/src/component/mod.rs b/xmpp/src/component/mod.rs index 75d47d9110399ae0f0be491be8cd6792ad4517cf..21b3792ef68c867431a487367e03c9bf3f711dce 100644 --- a/xmpp/src/component/mod.rs +++ b/xmpp/src/component/mod.rs @@ -1,77 +1,37 @@ -use bevy_app::{App, Plugin, Update}; -use bevy_ecs::resource::Resource; -use futures::StreamExt; -use tokio::runtime::Runtime; -use tokio::sync::mpsc; -use tokio_xmpp::{Component as TokioComponent, connect::TcpServerConnector, jid::BareJid}; - pub mod resource; mod system; -mod task; - -pub use resource::component_connection::{ComponentConfig, ComponentConnection, IncomingStanza}; -pub use system::receive_stanzas; - -#[derive(Resource)] -pub struct AsyncRuntime(pub Runtime); - -impl From for AsyncRuntime { - fn from(value: Runtime) -> Self { - Self(value) - } -} - -pub struct ComponentPlugin; - -impl Plugin for ComponentPlugin { - fn build(&self, app: &mut App) { - app.init_resource::() - .add_message::() - .add_systems(Update, receive_stanzas); - } -} - -pub fn setup_component_connection( - runtime: &Runtime, - component: TokioComponent, - buffer_size: usize, -) -> ComponentConnection { - let (incoming_tx, incoming_rx) = mpsc::channel(buffer_size); - let (outgoing_tx, outgoing_rx) = mpsc::channel(buffer_size); - - let jid: BareJid = component.jid.clone().into_bare(); - let (sink, stream) = component.split(); - - runtime.spawn(task::incoming_stanza_task(stream, incoming_tx)); - runtime.spawn(task::outgoing_stanza_task(sink, outgoing_rx)); - - ComponentConnection { - jid, - incoming_rx, - outgoing_tx, - } -} #[cfg(test)] mod test { + use crate::component::resource::{ComponentConfig, IncomingStanza, XmppComponent}; + use super::*; - use bevy_app::App; + use bevy_app::{App, RunMode, ScheduleRunnerPlugin, Startup, Update}; + use bevy_ecs::message::MessageReader; + use bevy_tokio_tasks::TokioTasksPlugin; + + fn log_incoming_stanzas(mut stanzas: MessageReader) { + for IncomingStanza(s) in stanzas.read() { + println!("Received stanza: {s:?}"); + } + } #[test] fn api_sketching() { - let runtime = Runtime::new().unwrap(); - println!("about to connect"); - let component = runtime - .block_on(TokioComponent::new("sgxbwmsgsv2.localhost", "secret")) - .unwrap(); - let conn = setup_component_connection(&runtime, component, 256); - println!("finished connecting"); App::new() - .insert_resource(conn) - .insert_resource(AsyncRuntime::from(runtime)) - .add_plugins(ComponentPlugin) + .add_plugins(ScheduleRunnerPlugin { + run_mode: RunMode::Loop { + wait: None + } + }) + .add_plugins(TokioTasksPlugin::default()) + .add_message::() + .init_resource::() + .init_resource::() + .add_systems(Startup, system::spawn_stanza_workers) + .add_systems(Update, log_incoming_stanzas) .run(); } } diff --git a/xmpp/src/component/resource/mod.rs b/xmpp/src/component/resource/mod.rs index bfa2d04648061589a7adf99124865e1c7fa6cde8..e78e08bb2440ba4b9152645f9337d9272cb35f27 100644 --- a/xmpp/src/component/resource/mod.rs +++ b/xmpp/src/component/resource/mod.rs @@ -1 +1,29 @@ -pub mod component_connection; +mod component_connection; + +use bevy_ecs::{resource::Resource, world::{FromWorld, World}}; +use bevy_tokio_tasks::TokioTasksRuntime; +use futures::{stream::SplitSink, stream::SplitStream, StreamExt}; +use tokio_xmpp::{connect::TcpServerConnector, Component, Stanza}; + +pub use component_connection::{ComponentConfig, IncomingStanza}; + +#[derive(Resource)] +pub struct XmppComponent { + pub reader: SplitStream>, + pub writer: SplitSink, Stanza>, +} + +impl FromWorld for XmppComponent { + fn from_world(world: &mut World) -> Self { + let Some(tokio_tasks) = world.get_resource_mut::() else { + panic!("Expected a TokioTasksRuntime, but it's not there") + }; + let conn = tokio_tasks.runtime().block_on(async move { + Component::new("sgxbwmsgsv2.localhost", "secret") + .await + .unwrap() + }); + let (writer, reader) = conn.split(); + XmppComponent { reader, writer } + } +} diff --git a/xmpp/src/component/system.rs b/xmpp/src/component/system.rs index 96657f001dddf3dab78ec8e38ef939f58698ccd5..1a775b7b0a5c88c4f9b84bd7c55118139aa668e6 100644 --- a/xmpp/src/component/system.rs +++ b/xmpp/src/component/system.rs @@ -1,30 +1,45 @@ -use bevy_ecs::prelude::*; -use futures::stream::Empty; -use tokio::sync::mpsc::error::TryRecvError; +use bevy_ecs::prelude::Messages; +use bevy_ecs::world::World; +use bevy_tokio_tasks::TokioTasksRuntime; +use futures::{SinkExt, StreamExt}; -use crate::component::resource::component_connection::{ - ComponentConfig, ComponentConnection, IncomingStanza, -}; +use crate::component::resource::{ComponentConfig, IncomingStanza, XmppComponent}; -pub fn receive_stanzas( - mut conn: ResMut, - mut messages: MessageWriter, - config: Res, -) { - let mut count = 0; - while count < config.max_stanzas_per_tick { - println!("About to pull another stanza off the stack!"); - match conn.incoming_rx.try_recv() { - Ok(stanza) => { - messages.write(IncomingStanza(stanza)); - count += 1; - } - Err(TryRecvError::Empty) => { - info!("Empty incoming stream!") - } - Err(TryRecvError::Disconnected) => { - panic!() - } +pub fn spawn_stanza_workers(world: &mut World) { + let Some(xmpp) = world.remove_resource::() else { + panic!("Expected XmppComponent resource to exist"); + }; + let XmppComponent { reader, writer } = xmpp; + + let batch_size = world + .get_resource::() + .map(|c| c.max_stanzas_per_tick) + .unwrap_or(100); + + let Some(runtime) = world.get_resource::() else { + panic!("Expected TokioTasksRuntime resource to exist"); + }; + + runtime.spawn_background_task(move |mut ctx| async move { + let mut chunked = reader.ready_chunks(batch_size); + + while let Some(stanzas) = chunked.next().await { + ctx.run_on_main_thread(move |main_ctx| { + main_ctx + .world + .resource_mut::>() + .write_batch(stanzas.into_iter().map(IncomingStanza)); + }) + .await; } - } + }); + + runtime.spawn_background_task(|mut ctx| async move { + let mut writer = writer; + ctx.run_on_main_thread(move |_main_ctx| { + println!("Writer task started"); + }) + .await; + let _ = writer.flush().await; + }); } diff --git a/xmpp/src/component/task.rs b/xmpp/src/component/task.rs deleted file mode 100644 index 843e65a69c04f92b93fb21b6dab25c8cf723bb7e..0000000000000000000000000000000000000000 --- a/xmpp/src/component/task.rs +++ /dev/null @@ -1,25 +0,0 @@ -use futures::{Sink, SinkExt, Stream, StreamExt}; -use tokio::sync::mpsc; -use tokio_xmpp::{Error, Stanza}; - -pub async fn incoming_stanza_task( - mut stream: impl Stream + Unpin, - tx: mpsc::Sender, -) { - while let Some(stanza) = stream.next().await { - if tx.send(stanza).await.is_err() { - break; - } - } -} - -pub async fn outgoing_stanza_task( - mut sink: impl Sink + Unpin, - mut rx: mpsc::Receiver, -) { - while let Some(stanza) = rx.recv().await { - if sink.send(stanza).await.is_err() { - break; - } - } -}