1use bevy_ecs::world::World;
2use bevy_tokio_tasks::TokioTasksRuntime;
3use futures::{SinkExt, StreamExt};
4
5use crate::component::resource::{ComponentConfig, XmppComponent};
6use crate::component::stanza_matcher::{MatcherRegistry, StanzaDispatch};
7
8pub(crate) fn spawn_stanza_workers<D: StanzaDispatch>(world: &mut World) {
9 let Some(xmpp) = world.remove_resource::<XmppComponent>() else {
10 panic!("Expected XmppComponent resource to exist");
11 };
12 let XmppComponent { reader, writer } = xmpp;
13
14 let registry = world
15 .remove_resource::<MatcherRegistry<D>>()
16 .expect("MatcherRegistry should exist");
17 let mut dispatchers = Box::new(registry.dispatchers);
18
19 let batch_size = world
20 .get_resource::<ComponentConfig>()
21 .map(|c| c.max_stanzas_per_tick)
22 .unwrap_or(100);
23
24 let Some(runtime) = world.get_resource::<TokioTasksRuntime>() else {
25 panic!("Expected TokioTasksRuntime resource to exist");
26 };
27
28 runtime.spawn_background_task(move |mut ctx| async move {
29 let mut chunked = reader.ready_chunks(batch_size);
30
31 while let Some(stanzas) = chunked.next().await {
32 dispatchers = ctx
33 .run_on_main_thread(move |main_ctx| {
34 for stanza in stanzas {
35 dispatchers.dispatch(&stanza, main_ctx.world);
36 }
37 dispatchers
38 })
39 .await;
40 }
41 });
42
43 runtime.spawn_background_task(|mut ctx| async move {
44 let mut writer = writer;
45 ctx.run_on_main_thread(move |_main_ctx| {
46 println!("Writer task started");
47 })
48 .await;
49 let _ = writer.flush().await;
50 });
51}