Move wait_for_events to event_loop module

xmppftw@kl.netlib.re created

Change summary

xmpp/src/event_loop.rs | 77 ++++++++++++++++++++++++++++++++++++++++++++
xmpp/src/lib.rs        | 62 ++---------------------------------
2 files changed, 81 insertions(+), 58 deletions(-)

Detailed changes

xmpp/src/event_loop.rs 🔗

@@ -0,0 +1,77 @@
+// Copyright (c) 2023 xmpp-rs contributors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use futures::StreamExt;
+use tokio_xmpp::{
+    parsers::{
+        disco::DiscoInfoQuery, iq::Iq, message::Message, presence::Presence, roster::Roster,
+    },
+    Event as TokioXmppEvent,
+};
+
+use crate::{iq, message, presence, Agent, Event};
+
+/// Wait for new events.
+///
+/// # Returns
+///
+/// - `Some(events)` if there are new events; multiple may be returned at once.
+/// - `None` if the underlying stream is closed.
+pub async fn wait_for_events(agent: &mut Agent) -> Option<Vec<Event>> {
+    if let Some(event) = agent.client.next().await {
+        let mut events = Vec::new();
+
+        match event {
+            TokioXmppEvent::Online { resumed: false, .. } => {
+                let presence = Agent::make_initial_presence(&agent.disco, &agent.node).into();
+                let _ = agent.client.send_stanza(presence).await;
+                events.push(Event::Online);
+                // TODO: only send this when the ContactList feature is enabled.
+                let iq = Iq::from_get(
+                    "roster",
+                    Roster {
+                        ver: None,
+                        items: vec![],
+                    },
+                )
+                .into();
+                let _ = agent.client.send_stanza(iq).await;
+
+                // Query account disco to know what bookmarks spec is used
+                let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into();
+                let _ = agent.client.send_stanza(iq).await;
+                agent.awaiting_disco_bookmarks_type = true;
+            }
+            TokioXmppEvent::Online { resumed: true, .. } => {}
+            TokioXmppEvent::Disconnected(e) => {
+                events.push(Event::Disconnected(e));
+            }
+            TokioXmppEvent::Stanza(elem) => {
+                if elem.is("iq", "jabber:client") {
+                    let iq = Iq::try_from(elem).unwrap();
+                    let new_events = iq::handle_iq(agent, iq).await;
+                    events.extend(new_events);
+                } else if elem.is("message", "jabber:client") {
+                    let message = Message::try_from(elem).unwrap();
+                    let new_events = message::handle_message(agent, message).await;
+                    events.extend(new_events);
+                } else if elem.is("presence", "jabber:client") {
+                    let presence = Presence::try_from(elem).unwrap();
+                    let new_events = presence::handle_presence(agent, presence).await;
+                    events.extend(new_events);
+                } else if elem.is("error", "http://etherx.jabber.org/streams") {
+                    println!("Received a fatal stream error: {}", String::from(&elem));
+                } else {
+                    panic!("Unknown stanza: {}", String::from(&elem));
+                }
+            }
+        }
+
+        Some(events)
+    } else {
+        None
+    }
+}

xmpp/src/lib.rs 🔗

@@ -6,21 +6,18 @@
 
 #![deny(bare_trait_objects)]
 
-use futures::stream::StreamExt;
 use std::path::{Path, PathBuf};
 use std::sync::{Arc, RwLock};
 pub use tokio_xmpp::parsers;
 use tokio_xmpp::parsers::{
     caps::{compute_disco, hash_caps, Caps},
-    disco::{DiscoInfoQuery, DiscoInfoResult},
+    disco::DiscoInfoResult,
     hashes::Algo,
-    iq::Iq,
     message::{Body, Message, MessageType},
     muc::{user::MucUser, Muc},
     presence::{Presence, Type as PresenceType},
-    roster::Roster,
 };
-use tokio_xmpp::{AsyncClient as TokioXmppClient, Event as TokioXmppEvent};
+use tokio_xmpp::AsyncClient as TokioXmppClient;
 pub use tokio_xmpp::{BareJid, Element, FullJid, Jid};
 #[macro_use]
 extern crate log;
@@ -28,6 +25,7 @@ extern crate log;
 pub mod builder;
 pub mod disco;
 pub mod event;
+pub mod event_loop;
 pub mod feature;
 pub mod iq;
 pub mod message;
@@ -177,59 +175,7 @@ impl Agent {
     /// - `Some(events)` if there are new events; multiple may be returned at once.
     /// - `None` if the underlying stream is closed.
     pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
-        if let Some(event) = self.client.next().await {
-            let mut events = Vec::new();
-
-            match event {
-                TokioXmppEvent::Online { resumed: false, .. } => {
-                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
-                    let _ = self.client.send_stanza(presence).await;
-                    events.push(Event::Online);
-                    // TODO: only send this when the ContactList feature is enabled.
-                    let iq = Iq::from_get(
-                        "roster",
-                        Roster {
-                            ver: None,
-                            items: vec![],
-                        },
-                    )
-                    .into();
-                    let _ = self.client.send_stanza(iq).await;
-
-                    // Query account disco to know what bookmarks spec is used
-                    let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into();
-                    let _ = self.client.send_stanza(iq).await;
-                    self.awaiting_disco_bookmarks_type = true;
-                }
-                TokioXmppEvent::Online { resumed: true, .. } => {}
-                TokioXmppEvent::Disconnected(e) => {
-                    events.push(Event::Disconnected(e));
-                }
-                TokioXmppEvent::Stanza(elem) => {
-                    if elem.is("iq", "jabber:client") {
-                        let iq = Iq::try_from(elem).unwrap();
-                        let new_events = iq::handle_iq(self, iq).await;
-                        events.extend(new_events);
-                    } else if elem.is("message", "jabber:client") {
-                        let message = Message::try_from(elem).unwrap();
-                        let new_events = message::handle_message(self, message).await;
-                        events.extend(new_events);
-                    } else if elem.is("presence", "jabber:client") {
-                        let presence = Presence::try_from(elem).unwrap();
-                        let new_events = presence::handle_presence(self, presence).await;
-                        events.extend(new_events);
-                    } else if elem.is("error", "http://etherx.jabber.org/streams") {
-                        println!("Received a fatal stream error: {}", String::from(&elem));
-                    } else {
-                        panic!("Unknown stanza: {}", String::from(&elem));
-                    }
-                }
-            }
-
-            Some(events)
-        } else {
-            None
-        }
+        event_loop::wait_for_events(self).await
     }
 
     pub async fn upload_file_with(&mut self, service: &str, path: &Path) {