wip: better runtime integration

Phillip Davis created

Change summary

xmpp/Cargo.toml                    |  1 
xmpp/src/component/mod.rs          | 84 ++++++++-----------------------
xmpp/src/component/resource/mod.rs | 30 +++++++++++
xmpp/src/component/system.rs       | 67 +++++++++++++++---------
xmpp/src/component/task.rs         | 25 ---------
5 files changed, 93 insertions(+), 114 deletions(-)

Detailed changes

xmpp/Cargo.toml 🔗

@@ -25,6 +25,7 @@ tokio-xmpp = { version = "5.0", features = ["component", "insecure-tcp"], path =
 thiserror = "2.0"
 bevy_ecs = "0.18.0"
 bevy_app = "0.18.0"
+bevy-tokio-tasks = "0.18.0"
 
 [dev-dependencies]
 env_logger = { version = "0.11", default-features = false, features = ["auto-color", "humantime"] }

xmpp/src/component/mod.rs 🔗

@@ -1,77 +1,37 @@
-use bevy_app::{App, Plugin, Update};
-use bevy_ecs::resource::Resource;
-use futures::StreamExt;
-use tokio::runtime::Runtime;
-use tokio::sync::mpsc;
-use tokio_xmpp::{Component as TokioComponent, connect::TcpServerConnector, jid::BareJid};
-
 pub mod resource;
 mod system;
-mod task;
-
-pub use resource::component_connection::{ComponentConfig, ComponentConnection, IncomingStanza};
-pub use system::receive_stanzas;
-
-#[derive(Resource)]
-pub struct AsyncRuntime(pub Runtime);
-
-impl From<Runtime> for AsyncRuntime {
-    fn from(value: Runtime) -> Self {
-        Self(value)
-    }
-}
-
-pub struct ComponentPlugin;
-
-impl Plugin for ComponentPlugin {
-    fn build(&self, app: &mut App) {
-        app.init_resource::<ComponentConfig>()
-            .add_message::<IncomingStanza>()
-            .add_systems(Update, receive_stanzas);
-    }
-}
-
-pub fn setup_component_connection(
-    runtime: &Runtime,
-    component: TokioComponent<TcpServerConnector>,
-    buffer_size: usize,
-) -> ComponentConnection {
-    let (incoming_tx, incoming_rx) = mpsc::channel(buffer_size);
-    let (outgoing_tx, outgoing_rx) = mpsc::channel(buffer_size);
-
-    let jid: BareJid = component.jid.clone().into_bare();
-    let (sink, stream) = component.split();
-
-    runtime.spawn(task::incoming_stanza_task(stream, incoming_tx));
-    runtime.spawn(task::outgoing_stanza_task(sink, outgoing_rx));
-
-    ComponentConnection {
-        jid,
-        incoming_rx,
-        outgoing_tx,
-    }
-}
 
 #[cfg(test)]
 mod test {
+    use crate::component::resource::{ComponentConfig, IncomingStanza, XmppComponent};
+
     use super::*;
-    use bevy_app::App;
+    use bevy_app::{App, RunMode, ScheduleRunnerPlugin, Startup, Update};
+    use bevy_ecs::message::MessageReader;
+    use bevy_tokio_tasks::TokioTasksPlugin;
+
+    fn log_incoming_stanzas(mut stanzas: MessageReader<IncomingStanza>) {
+        for IncomingStanza(s) in stanzas.read() {
+            println!("Received stanza: {s:?}");
+        }
+    }
 
     #[test]
     fn api_sketching() {
-        let runtime = Runtime::new().unwrap();
-        println!("about to connect");
-        let component = runtime
-            .block_on(TokioComponent::new("sgxbwmsgsv2.localhost", "secret"))
-            .unwrap();
-        let conn = setup_component_connection(&runtime, component, 256);
-
         println!("finished connecting");
 
         App::new()
-            .insert_resource(conn)
-            .insert_resource(AsyncRuntime::from(runtime))
-            .add_plugins(ComponentPlugin)
+            .add_plugins(ScheduleRunnerPlugin {
+				run_mode: RunMode::Loop {
+					wait: None
+				}
+			})
+            .add_plugins(TokioTasksPlugin::default())
+            .add_message::<IncomingStanza>()
+            .init_resource::<ComponentConfig>()
+            .init_resource::<XmppComponent>()
+            .add_systems(Startup, system::spawn_stanza_workers)
+            .add_systems(Update, log_incoming_stanzas)
             .run();
     }
 }

xmpp/src/component/resource/mod.rs 🔗

@@ -1 +1,29 @@
-pub mod component_connection;
+mod component_connection;
+
+use bevy_ecs::{resource::Resource, world::{FromWorld, World}};
+use bevy_tokio_tasks::TokioTasksRuntime;
+use futures::{stream::SplitSink, stream::SplitStream, StreamExt};
+use tokio_xmpp::{connect::TcpServerConnector, Component, Stanza};
+
+pub use component_connection::{ComponentConfig, IncomingStanza};
+
+#[derive(Resource)]
+pub struct XmppComponent {
+    pub reader: SplitStream<Component<TcpServerConnector>>,
+    pub writer: SplitSink<Component<TcpServerConnector>, Stanza>,
+}
+
+impl FromWorld for XmppComponent {
+    fn from_world(world: &mut World) -> Self {
+        let Some(tokio_tasks) = world.get_resource_mut::<TokioTasksRuntime>() else {
+            panic!("Expected a TokioTasksRuntime, but it's not there")
+        };
+        let conn = tokio_tasks.runtime().block_on(async move {
+            Component::new("sgxbwmsgsv2.localhost", "secret")
+                .await
+                .unwrap()
+        });
+        let (writer, reader) = conn.split();
+        XmppComponent { reader, writer }
+    }
+}

xmpp/src/component/system.rs 🔗

@@ -1,30 +1,45 @@
-use bevy_ecs::prelude::*;
-use futures::stream::Empty;
-use tokio::sync::mpsc::error::TryRecvError;
+use bevy_ecs::prelude::Messages;
+use bevy_ecs::world::World;
+use bevy_tokio_tasks::TokioTasksRuntime;
+use futures::{SinkExt, StreamExt};
 
-use crate::component::resource::component_connection::{
-    ComponentConfig, ComponentConnection, IncomingStanza,
-};
+use crate::component::resource::{ComponentConfig, IncomingStanza, XmppComponent};
 
-pub fn receive_stanzas(
-    mut conn: ResMut<ComponentConnection>,
-    mut messages: MessageWriter<IncomingStanza>,
-    config: Res<ComponentConfig>,
-) {
-    let mut count = 0;
-    while count < config.max_stanzas_per_tick {
-        println!("About to pull another stanza off the stack!");
-        match conn.incoming_rx.try_recv() {
-            Ok(stanza) => {
-                messages.write(IncomingStanza(stanza));
-                count += 1;
-            }
-            Err(TryRecvError::Empty) => {
-                info!("Empty incoming stream!")
-            }
-            Err(TryRecvError::Disconnected) => {
-                panic!()
-            }
+pub fn spawn_stanza_workers(world: &mut World) {
+    let Some(xmpp) = world.remove_resource::<XmppComponent>() else {
+        panic!("Expected XmppComponent resource to exist");
+    };
+    let XmppComponent { reader, writer } = xmpp;
+
+    let batch_size = world
+        .get_resource::<ComponentConfig>()
+        .map(|c| c.max_stanzas_per_tick)
+        .unwrap_or(100);
+
+    let Some(runtime) = world.get_resource::<TokioTasksRuntime>() else {
+        panic!("Expected TokioTasksRuntime resource to exist");
+    };
+
+    runtime.spawn_background_task(move |mut ctx| async move {
+        let mut chunked = reader.ready_chunks(batch_size);
+
+        while let Some(stanzas) = chunked.next().await {
+            ctx.run_on_main_thread(move |main_ctx| {
+                main_ctx
+                    .world
+                    .resource_mut::<Messages<IncomingStanza>>()
+                    .write_batch(stanzas.into_iter().map(IncomingStanza));
+            })
+            .await;
         }
-    }
+    });
+
+    runtime.spawn_background_task(|mut ctx| async move {
+        let mut writer = writer;
+        ctx.run_on_main_thread(move |_main_ctx| {
+            println!("Writer task started");
+        })
+        .await;
+        let _ = writer.flush().await;
+    });
 }

xmpp/src/component/task.rs 🔗

@@ -1,25 +0,0 @@
-use futures::{Sink, SinkExt, Stream, StreamExt};
-use tokio::sync::mpsc;
-use tokio_xmpp::{Error, Stanza};
-
-pub async fn incoming_stanza_task(
-    mut stream: impl Stream<Item = Stanza> + Unpin,
-    tx: mpsc::Sender<Stanza>,
-) {
-    while let Some(stanza) = stream.next().await {
-        if tx.send(stanza).await.is_err() {
-            break;
-        }
-    }
-}
-
-pub async fn outgoing_stanza_task(
-    mut sink: impl Sink<Stanza, Error = Error> + Unpin,
-    mut rx: mpsc::Receiver<Stanza>,
-) {
-    while let Some(stanza) = rx.recv().await {
-        if sink.send(stanza).await.is_err() {
-            break;
-        }
-    }
-}