tokio-xmpp: update for futures-0.3 (100% API breakage)

Astro created

Change summary

xmpp-rs/Cargo.toml            |   6 
xmpp-rs/examples/hello_bot.rs | 115 ++++-----
xmpp-rs/src/lib.rs            | 413 ++++++++++++++++--------------------
xmpp-rs/src/pubsub/avatar.rs  |  11 
xmpp-rs/src/pubsub/mod.rs     |  12 
5 files changed, 249 insertions(+), 308 deletions(-)

Detailed changes

xmpp-rs/Cargo.toml πŸ”—

@@ -14,10 +14,10 @@ license = "MPL-2.0"
 edition = "2018"
 
 [dependencies]
-tokio-xmpp = "1.0.1"
+tokio-xmpp = "2.0.0"
 xmpp-parsers = "0.17"
-futures = "0.1"
-tokio = "0.1"
+futures = "0.3"
+tokio = "0.2"
 log = "0.4"
 
 [features]

xmpp-rs/examples/hello_bot.rs πŸ”—

@@ -4,27 +4,22 @@
 // License, v. 2.0. If a copy of the MPL was not distributed with this
 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
-use futures::prelude::*;
 use std::env::args;
-use std::process::exit;
-use tokio::runtime::current_thread::Runtime;
 use xmpp::{ClientBuilder, ClientFeature, ClientType, Event};
 use xmpp_parsers::{message::MessageType, Jid};
 
-fn main() {
+#[tokio::main]
+async fn main() -> Result<(), Option<()>> {
     let args: Vec<String> = args().collect();
     if args.len() != 3 {
         println!("Usage: {} <jid> <password>", args[0]);
-        exit(1);
+        return Err(None);
     }
     let jid = &args[1];
     let password = &args[2];
 
-    // tokio_core context
-    let mut rt = Runtime::new().unwrap();
-
     // Client instance
-    let (mut agent, stream) = ClientBuilder::new(jid, password)
+    let mut client = ClientBuilder::new(jid, password)
         .set_client(ClientType::Bot, "xmpp-rs")
         .set_website("https://gitlab.com/xmpp-rs/xmpp-rs")
         .set_default_nick("bot")
@@ -34,58 +29,58 @@ fn main() {
         .build()
         .unwrap();
 
-    // We return either Some(Error) if an error was encountered
-    // or None, if we were simply disconnected
-    let handler = stream.map_err(Some).for_each(|evt: Event| {
-        match evt {
-            Event::Online => {
-                println!("Online.");
-            }
-            Event::Disconnected => {
-                println!("Disconnected.");
-                return Err(None);
-            }
-            Event::ContactAdded(contact) => {
-                println!("Contact {} added.", contact.jid);
-            }
-            Event::ContactRemoved(contact) => {
-                println!("Contact {} removed.", contact.jid);
-            }
-            Event::ContactChanged(contact) => {
-                println!("Contact {} changed.", contact.jid);
-            }
-            Event::JoinRoom(jid, conference) => {
-                println!("Joining room {} ({:?})…", jid, conference.name);
-                agent.join_room(
-                    jid,
-                    conference.nick,
-                    conference.password,
-                    "en",
-                    "Yet another bot!",
-                );
-            }
-            Event::LeaveRoom(jid) => {
-                println!("Leaving room {}…", jid);
-            }
-            Event::LeaveAllRooms => {
-                println!("Leaving all rooms…");
-            }
-            Event::RoomJoined(jid) => {
-                println!("Joined room {}.", jid);
-                agent.send_message(Jid::Bare(jid), MessageType::Groupchat, "en", "Hello world!");
-            }
-            Event::RoomLeft(jid) => {
-                println!("Left room {}.", jid);
-            }
-            Event::AvatarRetrieved(jid, path) => {
-                println!("Received avatar for {} in {}.", jid, path);
+    while let Some(events) = client.wait_for_events().await {
+        for event in events {
+            match event {
+                Event::Online => {
+                    println!("Online.");
+                }
+                Event::Disconnected => {
+                    println!("Disconnected");
+                    return Err(None);
+                }
+                Event::ContactAdded(contact) => {
+                    println!("Contact {} added.", contact.jid);
+                }
+                Event::ContactRemoved(contact) => {
+                    println!("Contact {} removed.", contact.jid);
+                }
+                Event::ContactChanged(contact) => {
+                    println!("Contact {} changed.", contact.jid);
+                }
+                Event::JoinRoom(jid, conference) => {
+                    println!("Joining room {} ({:?})…", jid, conference.name);
+                    client
+                        .join_room(
+                            jid,
+                            conference.nick,
+                            conference.password,
+                            "en",
+                            "Yet another bot!",
+                        )
+                        .await;
+                }
+                Event::LeaveRoom(jid) => {
+                    println!("Leaving room {}…", jid);
+                }
+                Event::LeaveAllRooms => {
+                    println!("Leaving all rooms…");
+                }
+                Event::RoomJoined(jid) => {
+                    println!("Joined room {}.", jid);
+                    client
+                        .send_message(Jid::Bare(jid), MessageType::Groupchat, "en", "Hello world!")
+                        .await;
+                }
+                Event::RoomLeft(jid) => {
+                    println!("Left room {}.", jid);
+                }
+                Event::AvatarRetrieved(jid, path) => {
+                    println!("Received avatar for {} in {}.", jid, path);
+                }
             }
         }
-        Ok(())
-    });
+    }
 
-    rt.block_on(handler).unwrap_or_else(|e| match e {
-        Some(e) => println!("Error: {:?}", e),
-        None => println!("Disconnected."),
-    });
+    Ok(())
 }

xmpp-rs/src/lib.rs πŸ”—

@@ -6,12 +6,11 @@
 
 #![deny(bare_trait_objects)]
 
-use futures::{sync::mpsc, Future, Sink, Stream};
+use futures::stream::StreamExt;
 use std::cell::RefCell;
 use std::convert::TryFrom;
 use std::rc::Rc;
-use std::str::FromStr;
-use tokio_xmpp::{Client as TokioXmppClient, Event as TokioXmppEvent, Packet};
+use tokio_xmpp::{Client as TokioXmppClient, Event as TokioXmppEvent};
 use xmpp_parsers::{
     bookmarks2::Conference,
     caps::{compute_disco, hash_caps, Caps},
@@ -28,7 +27,7 @@ use xmpp_parsers::{
     pubsub::pubsub::{Items, PubSub},
     roster::{Item as RosterItem, Roster},
     stanza_error::{DefinedCondition, ErrorType, StanzaError},
-    BareJid, FullJid, Jid, JidParseError,
+    BareJid, FullJid, Jid,
 };
 #[macro_use]
 extern crate log;
@@ -149,225 +148,36 @@ impl ClientBuilder<'_> {
         }
     }
 
-    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
-        let caps_data = compute_disco(disco);
-        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
-        let caps = Caps::new(node, hash);
-
-        let mut presence = Presence::new(PresenceType::None);
-        presence.add_payload(caps);
-        presence
-    }
-
-    pub fn build(
-        self,
-    ) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError> {
+    pub fn build(self) -> Result<Agent, Error> {
         let client = TokioXmppClient::new(self.jid, self.password)?;
-        let (sender_tx, sender_rx) = mpsc::unbounded();
-        Ok(self.build_impl(client, sender_tx, sender_rx)?)
+        Ok(self.build_impl(client)?)
     }
 
     // This function is meant to be used for testing build
-    pub(crate) fn build_impl<S>(
-        self,
-        stream: S,
-        sender_tx: mpsc::UnboundedSender<Packet>,
-        sender_rx: mpsc::UnboundedReceiver<Packet>,
-    ) -> Result<(Agent, impl Stream<Item = Event, Error = tokio_xmpp::Error>), JidParseError>
-    where
-        S: Stream<Item = tokio_xmpp::Event, Error = tokio_xmpp::Error>
-            + Sink<SinkItem = tokio_xmpp::Packet, SinkError = tokio_xmpp::Error>,
-    {
+    pub(crate) fn build_impl(self, client: TokioXmppClient) -> Result<Agent, Error> {
         let disco = self.make_disco();
         let node = self.website;
 
-        let client = stream;
-        let (sink, stream) = client.split();
-
-        let reader = {
-            let mut sender_tx = sender_tx.clone();
-            let jid = self.jid.to_owned();
-            stream
-                .map(move |event| {
-                    // Helper function to send an iq error.
-                    let mut events = Vec::new();
-                    let send_error = |to, id, type_, condition, text: &str| {
-                        let error = StanzaError::new(type_, condition, "en", text);
-                        let iq = Iq::from_error(id, error).with_to(to).into();
-                        sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
-                    };
-
-                    match event {
-                        TokioXmppEvent::Online(_) => {
-                            let presence =
-                                ClientBuilder::make_initial_presence(&disco, &node).into();
-                            let packet = Packet::Stanza(presence);
-                            sender_tx.unbounded_send(packet).unwrap();
-                            events.push(Event::Online);
-                            // TODO: only send this when the ContactList feature is enabled.
-                            let iq = Iq::from_get(
-                                "roster",
-                                Roster {
-                                    ver: None,
-                                    items: vec![],
-                                },
-                            )
-                            .into();
-                            sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
-                            // TODO: only send this when the JoinRooms feature is enabled.
-                            let iq = Iq::from_get(
-                                "bookmarks",
-                                PubSub::Items(Items::new(ns::BOOKMARKS2)),
-                            )
-                            .into();
-                            sender_tx.unbounded_send(Packet::Stanza(iq)).unwrap();
-                        }
-                        TokioXmppEvent::Disconnected => {
-                            events.push(Event::Disconnected);
-                        }
-                        TokioXmppEvent::Stanza(stanza) => {
-                            if stanza.is("iq", "jabber:client") {
-                                let iq = Iq::try_from(stanza).unwrap();
-                                let from = iq
-                                    .from
-                                    .clone()
-                                    .unwrap_or_else(|| Jid::from_str(&jid).unwrap());
-                                if let IqType::Get(payload) = iq.payload {
-                                    if payload.is("query", ns::DISCO_INFO) {
-                                        let query = DiscoInfoQuery::try_from(payload);
-                                        match query {
-                                            Ok(query) => {
-                                                let mut disco_info = disco.clone();
-                                                disco_info.node = query.node;
-                                                let iq = Iq::from_result(iq.id, Some(disco_info))
-                                                    .with_to(iq.from.unwrap())
-                                                    .into();
-                                                sender_tx
-                                                    .unbounded_send(Packet::Stanza(iq))
-                                                    .unwrap();
-                                            }
-                                            Err(err) => {
-                                                send_error(
-                                                    iq.from.unwrap(),
-                                                    iq.id,
-                                                    ErrorType::Modify,
-                                                    DefinedCondition::BadRequest,
-                                                    &format!("{}", err),
-                                                );
-                                            }
-                                        }
-                                    } else {
-                                        // We MUST answer unhandled get iqs with a service-unavailable error.
-                                        send_error(
-                                            iq.from.unwrap(),
-                                            iq.id,
-                                            ErrorType::Cancel,
-                                            DefinedCondition::ServiceUnavailable,
-                                            "No handler defined for this kind of iq.",
-                                        );
-                                    }
-                                } else if let IqType::Result(Some(payload)) = iq.payload {
-                                    // TODO: move private iqs like this one somewhere else, for
-                                    // security reasons.
-                                    if payload.is("query", ns::ROSTER) && iq.from.is_none() {
-                                        let roster = Roster::try_from(payload).unwrap();
-                                        for item in roster.items.into_iter() {
-                                            events.push(Event::ContactAdded(item));
-                                        }
-                                    } else if payload.is("pubsub", ns::PUBSUB) {
-                                        let new_events = pubsub::handle_iq_result(&from, payload);
-                                        events.extend(new_events);
-                                    }
-                                } else if let IqType::Set(_) = iq.payload {
-                                    // We MUST answer unhandled set iqs with a service-unavailable error.
-                                    send_error(
-                                        iq.from.unwrap(),
-                                        iq.id,
-                                        ErrorType::Cancel,
-                                        DefinedCondition::ServiceUnavailable,
-                                        "No handler defined for this kind of iq.",
-                                    );
-                                }
-                            } else if stanza.is("message", "jabber:client") {
-                                let message = Message::try_from(stanza).unwrap();
-                                let from = message.from.clone().unwrap();
-                                for child in message.payloads {
-                                    if child.is("event", ns::PUBSUB_EVENT) {
-                                        let new_events =
-                                            pubsub::handle_event(&from, child, &mut sender_tx);
-                                        events.extend(new_events);
-                                    }
-                                }
-                            } else if stanza.is("presence", "jabber:client") {
-                                let presence = Presence::try_from(stanza).unwrap();
-                                let from: BareJid = match presence.from.clone().unwrap() {
-                                    Jid::Full(FullJid { node, domain, .. }) => {
-                                        BareJid { node, domain }
-                                    }
-                                    Jid::Bare(bare) => bare,
-                                };
-                                for payload in presence.payloads.into_iter() {
-                                    let muc_user = match MucUser::try_from(payload) {
-                                        Ok(muc_user) => muc_user,
-                                        _ => continue,
-                                    };
-                                    for status in muc_user.status.into_iter() {
-                                        if status == Status::SelfPresence {
-                                            events.push(Event::RoomJoined(from.clone()));
-                                            break;
-                                        }
-                                    }
-                                }
-                            } else if stanza.is("error", "http://etherx.jabber.org/streams") {
-                                println!(
-                                    "Received a fatal stream error: {}",
-                                    String::from(&stanza)
-                                );
-                            } else {
-                                panic!("Unknown stanza: {}", String::from(&stanza));
-                            }
-                        }
-                    }
-
-                    futures::stream::iter_ok(events)
-                })
-                .flatten()
-        };
-
-        let sender = sender_rx
-            .map_err(|e| panic!("Sink error: {:?}", e))
-            .forward(sink)
-            .map(|(rx, mut sink)| {
-                drop(rx);
-                let _ = sink.close();
-                None
-            });
-
-        // TODO is this correct?
-        // Some(Error) means a real error
-        // None means the end of the sender stream and can be ignored
-        let future = reader
-            .map(Some)
-            .select(sender.into_stream())
-            .filter_map(|x| x);
-
         let agent = Agent {
-            sender_tx,
+            client,
             default_nick: Rc::new(RefCell::new(self.default_nick)),
+            disco,
+            node,
         };
 
-        Ok((agent, future))
+        Ok(agent)
     }
 }
 
-#[derive(Clone, Debug)]
 pub struct Agent {
-    sender_tx: mpsc::UnboundedSender<Packet>,
+    client: TokioXmppClient,
     default_nick: Rc<RefCell<String>>,
+    disco: DiscoInfoResult,
+    node: String,
 }
 
 impl Agent {
-    pub fn join_room(
+    pub async fn join_room(
         &mut self,
         room: BareJid,
         nick: Option<String>,
@@ -385,39 +195,181 @@ impl Agent {
         let mut presence = Presence::new(PresenceType::None).with_to(Jid::Full(room_jid));
         presence.add_payload(muc);
         presence.set_status(String::from(lang), String::from(status));
-        let presence = presence.into();
-        self.sender_tx
-            .unbounded_send(Packet::Stanza(presence))
-            .unwrap();
+        let _ = self.client.send_stanza(presence.into()).await;
     }
 
-    pub fn send_message(&mut self, recipient: Jid, type_: MessageType, lang: &str, text: &str) {
+    pub async fn send_message(
+        &mut self,
+        recipient: Jid,
+        type_: MessageType,
+        lang: &str,
+        text: &str,
+    ) {
         let mut message = Message::new(Some(recipient));
         message.type_ = type_;
         message
             .bodies
             .insert(String::from(lang), Body(String::from(text)));
-        let message = message.into();
-        self.sender_tx
-            .unbounded_send(Packet::Stanza(message))
-            .unwrap();
+        let _ = self.client.send_stanza(message.into()).await;
+    }
+
+    fn make_initial_presence(disco: &DiscoInfoResult, node: &str) -> Presence {
+        let caps_data = compute_disco(disco);
+        let hash = hash_caps(&caps_data, Algo::Sha_1).unwrap();
+        let caps = Caps::new(node, hash);
+
+        let mut presence = Presence::new(PresenceType::None);
+        presence.add_payload(caps);
+        presence
+    }
+
+    pub async fn wait_for_events(&mut self) -> Option<Vec<Event>> {
+        if let Some(event) = self.client.next().await {
+            let mut events = Vec::new();
+
+            match event {
+                TokioXmppEvent::Online(_) => {
+                    let presence = Self::make_initial_presence(&self.disco, &self.node).into();
+                    let _ = self.client.send_stanza(presence).await;
+                    events.push(Event::Online);
+                    // TODO: only send this when the ContactList feature is enabled.
+                    let iq = Iq::from_get(
+                        "roster",
+                        Roster {
+                            ver: None,
+                            items: vec![],
+                        },
+                    )
+                    .into();
+                    let _ = self.client.send_stanza(iq).await;
+                    // TODO: only send this when the JoinRooms feature is enabled.
+                    let iq =
+                        Iq::from_get("bookmarks", PubSub::Items(Items::new(ns::BOOKMARKS2))).into();
+                    let _ = self.client.send_stanza(iq).await;
+                }
+                TokioXmppEvent::Disconnected(_) => {
+                    events.push(Event::Disconnected);
+                }
+                TokioXmppEvent::Stanza(stanza) => {
+                    if stanza.is("iq", "jabber:client") {
+                        let iq = Iq::try_from(stanza).unwrap();
+                        let from = iq
+                            .from
+                            .clone()
+                            .unwrap_or_else(|| self.client.bound_jid().unwrap().clone());
+                        if let IqType::Get(payload) = iq.payload {
+                            if payload.is("query", ns::DISCO_INFO) {
+                                let query = DiscoInfoQuery::try_from(payload);
+                                match query {
+                                    Ok(query) => {
+                                        let mut disco_info = self.disco.clone();
+                                        disco_info.node = query.node;
+                                        let iq = Iq::from_result(iq.id, Some(disco_info))
+                                            .with_to(iq.from.unwrap())
+                                            .into();
+                                        let _ = self.client.send_stanza(iq).await;
+                                    }
+                                    Err(err) => {
+                                        let error = StanzaError::new(
+                                            ErrorType::Modify,
+                                            DefinedCondition::BadRequest,
+                                            "en",
+                                            &format!("{}", err),
+                                        );
+                                        let iq = Iq::from_error(iq.id, error)
+                                            .with_to(iq.from.unwrap())
+                                            .into();
+                                        let _ = self.client.send_stanza(iq).await;
+                                    }
+                                }
+                            } else {
+                                // We MUST answer unhandled get iqs with a service-unavailable error.
+                                let error = StanzaError::new(
+                                    ErrorType::Cancel,
+                                    DefinedCondition::ServiceUnavailable,
+                                    "en",
+                                    "No handler defined for this kind of iq.",
+                                );
+                                let iq = Iq::from_error(iq.id, error)
+                                    .with_to(iq.from.unwrap())
+                                    .into();
+                                let _ = self.client.send_stanza(iq).await;
+                            }
+                        } else if let IqType::Result(Some(payload)) = iq.payload {
+                            // TODO: move private iqs like this one somewhere else, for
+                            // security reasons.
+                            if payload.is("query", ns::ROSTER) && iq.from.is_none() {
+                                let roster = Roster::try_from(payload).unwrap();
+                                for item in roster.items.into_iter() {
+                                    events.push(Event::ContactAdded(item));
+                                }
+                            } else if payload.is("pubsub", ns::PUBSUB) {
+                                let new_events = pubsub::handle_iq_result(&from, payload);
+                                events.extend(new_events);
+                            }
+                        } else if let IqType::Set(_) = iq.payload {
+                            // We MUST answer unhandled set iqs with a service-unavailable error.
+                            let error = StanzaError::new(
+                                ErrorType::Cancel,
+                                DefinedCondition::ServiceUnavailable,
+                                "en",
+                                "No handler defined for this kind of iq.",
+                            );
+                            let iq = Iq::from_error(iq.id, error)
+                                .with_to(iq.from.unwrap())
+                                .into();
+                            let _ = self.client.send_stanza(iq).await;
+                        }
+                    } else if stanza.is("message", "jabber:client") {
+                        let message = Message::try_from(stanza).unwrap();
+                        let from = message.from.clone().unwrap();
+                        for child in message.payloads {
+                            if child.is("event", ns::PUBSUB_EVENT) {
+                                let new_events = pubsub::handle_event(&from, child, self).await;
+                                events.extend(new_events);
+                            }
+                        }
+                    } else if stanza.is("presence", "jabber:client") {
+                        let presence = Presence::try_from(stanza).unwrap();
+                        let from: BareJid = match presence.from.clone().unwrap() {
+                            Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
+                            Jid::Bare(bare) => bare,
+                        };
+                        for payload in presence.payloads.into_iter() {
+                            let muc_user = match MucUser::try_from(payload) {
+                                Ok(muc_user) => muc_user,
+                                _ => continue,
+                            };
+                            for status in muc_user.status.into_iter() {
+                                if status == Status::SelfPresence {
+                                    events.push(Event::RoomJoined(from.clone()));
+                                    break;
+                                }
+                            }
+                        }
+                    } else if stanza.is("error", "http://etherx.jabber.org/streams") {
+                        println!("Received a fatal stream error: {}", String::from(&stanza));
+                    } else {
+                        panic!("Unknown stanza: {}", String::from(&stanza));
+                    }
+                }
+            }
+
+            Some(events)
+        } else {
+            None
+        }
     }
 }
 
 #[cfg(test)]
 mod tests {
     use super::{Agent, ClientBuilder, ClientFeature, ClientType, Event};
-    use futures::prelude::*;
-    use futures::sync::mpsc;
-    use tokio::runtime::current_thread::Runtime;
     use tokio_xmpp::Client as TokioXmppClient;
 
-    #[test]
-    fn test_simple() {
-        // tokio_core context
-        let mut rt = Runtime::new().unwrap();
+    #[tokio::test]
+    async fn test_simple() {
         let client = TokioXmppClient::new("foo@bar", "meh").unwrap();
-        let (sender_tx, sender_rx) = mpsc::unbounded();
 
         // Client instance
         let client_builder = ClientBuilder::new("foo@bar", "meh")
@@ -427,16 +379,15 @@ mod tests {
             .enable_feature(ClientFeature::Avatars)
             .enable_feature(ClientFeature::ContactList);
 
-        let (_agent, stream): (Agent, _) = client_builder
-            .build_impl(client, sender_tx.clone(), sender_rx)
-            .unwrap();
-
-        let handler = stream.map_err(Some).for_each(|_evt: Event| {
-            return Err(None);
-        });
+        let mut agent: Agent = client_builder.build_impl(client).unwrap();
 
-        rt.block_on(handler).unwrap_or_else(|e| match e {
-            _ => (),
-        });
+        while let Some(events) = agent.wait_for_events().await {
+            assert!(match events[0] {
+                Event::Disconnected => true,
+                _ => false,
+            });
+            assert_eq!(events.len(), 1);
+            break;
+        }
     }
 }

xmpp-rs/src/pubsub/avatar.rs πŸ”—

@@ -4,12 +4,11 @@
 // License, v. 2.0. If a copy of the MPL was not distributed with this
 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
+use super::Agent;
 use crate::Event;
-use futures::{sync::mpsc, Sink};
 use std::convert::TryFrom;
 use std::fs::{self, File};
 use std::io::{self, Write};
-use tokio_xmpp::Packet;
 use xmpp_parsers::{
     avatar::{Data, Metadata},
     iq::Iq,
@@ -22,11 +21,11 @@ use xmpp_parsers::{
     Jid,
 };
 
-pub(crate) fn handle_metadata_pubsub_event(
+pub(crate) async fn handle_metadata_pubsub_event(
     from: &Jid,
-    tx: &mut mpsc::UnboundedSender<Packet>,
+    agent: &mut Agent,
     items: Vec<Item>,
-) -> impl IntoIterator<Item = Event> {
+) -> Vec<Event> {
     let mut events = Vec::new();
     for item in items {
         let payload = item.payload.clone().unwrap();
@@ -43,7 +42,7 @@ pub(crate) fn handle_metadata_pubsub_event(
                     events.push(Event::AvatarRetrieved(from.clone(), filename));
                 } else {
                     let iq = download_avatar(from);
-                    tx.start_send(Packet::Stanza(iq.into())).unwrap();
+                    let _ = agent.client.send_stanza(iq.into()).await;
                 }
             }
         }

xmpp-rs/src/pubsub/mod.rs πŸ”—

@@ -4,11 +4,10 @@
 // License, v. 2.0. If a copy of the MPL was not distributed with this
 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
+use super::Agent;
 use crate::Event;
-use futures::sync::mpsc;
 use std::convert::TryFrom;
 use std::str::FromStr;
-use tokio_xmpp::Packet;
 use xmpp_parsers::{
     bookmarks2::{Autojoin, Conference},
     ns,
@@ -20,11 +19,7 @@ use xmpp_parsers::{
 #[cfg(feature = "avatars")]
 pub(crate) mod avatar;
 
-pub(crate) fn handle_event(
-    from: &Jid,
-    elem: Element,
-    mut tx: &mut mpsc::UnboundedSender<Packet>,
-) -> impl IntoIterator<Item = Event> {
+pub(crate) async fn handle_event(from: &Jid, elem: Element, agent: &mut Agent) -> Vec<Event> {
     let mut events = Vec::new();
     let event = PubSubEvent::try_from(elem);
     trace!("PubSub event: {:#?}", event);
@@ -33,7 +28,8 @@ pub(crate) fn handle_event(
             match node.0 {
                 #[cfg(feature = "avatars")]
                 ref node if node == ns::AVATAR_METADATA => {
-                    let new_events = avatar::handle_metadata_pubsub_event(&from, &mut tx, items);
+                    let new_events =
+                        avatar::handle_metadata_pubsub_event(&from, agent, items).await;
                     events.extend(new_events);
                 }
                 ref node if node == ns::BOOKMARKS2 => {