system.rs

 1use bevy_ecs::world::World;
 2use bevy_tokio_tasks::TokioTasksRuntime;
 3use futures::{SinkExt, StreamExt};
 4
 5use crate::component::resource::{ComponentConfig, XmppComponent};
 6use crate::component::stanza_matcher::{MatcherRegistry, StanzaDispatch};
 7
 8pub(crate) fn spawn_stanza_workers<D: StanzaDispatch>(world: &mut World) {
 9    let Some(xmpp) = world.remove_resource::<XmppComponent>() else {
10        panic!("Expected XmppComponent resource to exist");
11    };
12    let XmppComponent { reader, writer } = xmpp;
13
14    let registry = world
15        .remove_resource::<MatcherRegistry<D>>()
16        .expect("MatcherRegistry should exist");
17    let mut dispatchers = Box::new(registry.dispatchers);
18
19    let batch_size = world
20        .get_resource::<ComponentConfig>()
21        .map(|c| c.max_stanzas_per_tick)
22        .unwrap_or(100);
23
24    let Some(runtime) = world.get_resource::<TokioTasksRuntime>() else {
25        panic!("Expected TokioTasksRuntime resource to exist");
26    };
27
28    runtime.spawn_background_task(move |mut ctx| async move {
29        let mut chunked = reader.ready_chunks(batch_size);
30
31        while let Some(stanzas) = chunked.next().await {
32            dispatchers = ctx
33                .run_on_main_thread(move |main_ctx| {
34                    for stanza in stanzas {
35                        dispatchers.dispatch(&stanza, main_ctx.world);
36                    }
37                    dispatchers
38                })
39                .await;
40        }
41    });
42
43    runtime.spawn_background_task(|mut ctx| async move {
44        let mut writer = writer;
45        ctx.run_on_main_thread(move |_main_ctx| {
46            println!("Writer task started");
47        })
48        .await;
49        let _ = writer.flush().await;
50    });
51}