event_loop.rs

 1// Copyright (c) 2023 xmpp-rs contributors.
 2//
 3// This Source Code Form is subject to the terms of the Mozilla Public
 4// License, v. 2.0. If a copy of the MPL was not distributed with this
 5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
 6
 7use futures::StreamExt;
 8use tokio_xmpp::connect::ServerConnector;
 9use tokio_xmpp::{
10    parsers::{disco::DiscoInfoQuery, iq::Iq, roster::Roster},
11    Event as TokioXmppEvent, Stanza,
12};
13
14use crate::{iq, message, presence, Agent, Event};
15
16/// Wait for new events, or Error::Disconnected when stream is closed and will not reconnect.
17pub async fn wait_for_events<C: ServerConnector>(agent: &mut Agent<C>) -> Vec<Event> {
18    if let Some(event) = agent.client.next().await {
19        let mut events = Vec::new();
20
21        match event {
22            TokioXmppEvent::Online { resumed: false, .. } => {
23                let presence =
24                    presence::send::make_initial_presence(&agent.disco, &agent.node).into();
25                let _ = agent.client.send_stanza(presence).await;
26                events.push(Event::Online);
27                // TODO: only send this when the ContactList feature is enabled.
28                let iq = Iq::from_get(
29                    "roster",
30                    Roster {
31                        ver: None,
32                        items: vec![],
33                    },
34                )
35                .into();
36                let _ = agent.client.send_stanza(iq).await;
37
38                // Query account disco to know what bookmarks spec is used
39                let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into();
40                let _ = agent.client.send_stanza(iq).await;
41                agent.awaiting_disco_bookmarks_type = true;
42            }
43            TokioXmppEvent::Online { resumed: true, .. } => {}
44            TokioXmppEvent::Disconnected(e) => {
45                events.push(Event::Disconnected(e));
46            }
47            TokioXmppEvent::Stanza(Stanza::Iq(iq)) => {
48                let new_events = iq::handle_iq(agent, iq).await;
49                events.extend(new_events);
50            }
51            TokioXmppEvent::Stanza(Stanza::Message(message)) => {
52                let new_events = message::receive::handle_message(agent, message).await;
53                events.extend(new_events);
54            }
55            TokioXmppEvent::Stanza(Stanza::Presence(presence)) => {
56                let new_events = presence::receive::handle_presence(agent, presence).await;
57                events.extend(new_events);
58            }
59        }
60
61        events
62    } else {
63        // Stream was closed and not opening again because TokioXmppClient reconnect is false
64        // However we set reconnect true in agent builder so this should never happen and indicates
65        // logic error in tokio_xmpp::AsyncClient::poll_next
66        panic!("xmpp::Agent should never receive None event (stream closed, no reconnect)");
67    }
68}