event_loop.rs

 1// Copyright (c) 2023 xmpp-rs contributors.
 2//
 3// This Source Code Form is subject to the terms of the Mozilla Public
 4// License, v. 2.0. If a copy of the MPL was not distributed with this
 5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
 6
 7use futures::StreamExt;
 8use tokio_xmpp::connect::ServerConnector;
 9use tokio_xmpp::{
10    parsers::{
11        disco::DiscoInfoQuery, iq::Iq, message::Message, presence::Presence, roster::Roster,
12    },
13    Event as TokioXmppEvent,
14};
15
16use crate::{iq, message, presence, Agent, Event};
17
18/// Wait for new events, or Error::Disconnected when stream is closed and will not reconnect.
19pub async fn wait_for_events<C: ServerConnector>(agent: &mut Agent<C>) -> Vec<Event> {
20    if let Some(event) = agent.client.next().await {
21        let mut events = Vec::new();
22
23        match event {
24            TokioXmppEvent::Online { resumed: false, .. } => {
25                let presence =
26                    presence::send::make_initial_presence(&agent.disco, &agent.node).into();
27                let _ = agent.client.send_stanza(presence).await;
28                events.push(Event::Online);
29                // TODO: only send this when the ContactList feature is enabled.
30                let iq = Iq::from_get(
31                    "roster",
32                    Roster {
33                        ver: None,
34                        items: vec![],
35                    },
36                )
37                .into();
38                let _ = agent.client.send_stanza(iq).await;
39
40                // Query account disco to know what bookmarks spec is used
41                let iq = Iq::from_get("disco-account", DiscoInfoQuery { node: None }).into();
42                let _ = agent.client.send_stanza(iq).await;
43                agent.awaiting_disco_bookmarks_type = true;
44            }
45            TokioXmppEvent::Online { resumed: true, .. } => {}
46            TokioXmppEvent::Disconnected(e) => {
47                events.push(Event::Disconnected(e));
48            }
49            TokioXmppEvent::Stanza(elem) => {
50                if elem.is("iq", "jabber:client") {
51                    let iq = Iq::try_from(elem).unwrap();
52                    let new_events = iq::handle_iq(agent, iq).await;
53                    events.extend(new_events);
54                } else if elem.is("message", "jabber:client") {
55                    let message = Message::try_from(elem).unwrap();
56                    let new_events = message::receive::handle_message(agent, message).await;
57                    events.extend(new_events);
58                } else if elem.is("presence", "jabber:client") {
59                    let presence = Presence::try_from(elem).unwrap();
60                    let new_events = presence::receive::handle_presence(agent, presence).await;
61                    events.extend(new_events);
62                } else if elem.is("error", "http://etherx.jabber.org/streams") {
63                    println!("Received a fatal stream error: {}", String::from(&elem));
64                } else {
65                    panic!("Unknown stanza: {}", String::from(&elem));
66                }
67            }
68        }
69
70        events
71    } else {
72        // Stream was closed and not opening again because TokioXmppClient reconnect is false
73        // However we set reconnect true in agent builder so this should never happen and indicates
74        // logic error in tokio_xmpp::AsyncClient::poll_next
75        panic!("xmpp::Agent should never receive None event (stream closed, no reconnect)");
76    }
77}