diff --git a/tokio-xmpp/examples/contact_addr.rs b/tokio-xmpp/examples/contact_addr.rs
index d9a14484cec58323640dc82b89426a4a5c3fd9b6..343b90d9408b7823226e8add3a4529a480d88a09 100644
--- a/tokio-xmpp/examples/contact_addr.rs
+++ b/tokio-xmpp/examples/contact_addr.rs
@@ -28,39 +28,31 @@ async fn main() {
let mut client = Client::new(jid, password);
// Main loop, processes events
- let mut wait_for_stream_end = false;
- let mut stream_ended = false;
- while !stream_ended {
- if let Some(event) = client.next().await {
- if wait_for_stream_end {
- /* Do Nothing. */
- } else if event.is_online() {
- println!("Online!");
+ while let Some(event) = client.next().await {
+ if event.is_online() {
+ println!("Online!");
- let target_jid: Jid = target.clone().parse().unwrap();
- let iq = make_disco_iq(target_jid);
- println!("Sending disco#info request to {}", target.clone());
- println!(">> {:?}", iq);
- client.send_stanza(iq.into()).await.unwrap();
- } else if let Some(Stanza::Iq(iq)) = event.into_stanza() {
- if let IqType::Result(Some(payload)) = iq.payload {
- if payload.is("query", ns::DISCO_INFO) {
- if let Ok(disco_info) = DiscoInfoResult::try_from(payload) {
- for ext in disco_info.extensions {
- if let Ok(server_info) = ServerInfo::try_from(ext) {
- print_server_info(server_info);
- }
+ let target_jid: Jid = target.clone().parse().unwrap();
+ let iq = make_disco_iq(target_jid);
+ println!("Sending disco#info request to {}", target.clone());
+ println!(">> {:?}", iq);
+ client.send_stanza(iq.into()).await.unwrap();
+ } else if let Some(Stanza::Iq(iq)) = event.into_stanza() {
+ if let IqType::Result(Some(payload)) = iq.payload {
+ if payload.is("query", ns::DISCO_INFO) {
+ if let Ok(disco_info) = DiscoInfoResult::try_from(payload) {
+ for ext in disco_info.extensions {
+ if let Ok(server_info) = ServerInfo::try_from(ext) {
+ print_server_info(server_info);
}
}
}
- wait_for_stream_end = true;
- client.send_end().await.unwrap();
}
+ break;
}
- } else {
- stream_ended = true;
}
}
+ client.send_end().await.expect("Stream shutdown unclean");
}
fn make_disco_iq(target: Jid) -> Iq {
diff --git a/tokio-xmpp/examples/download_avatars.rs b/tokio-xmpp/examples/download_avatars.rs
index 7192639fa47a26263b9f912373e4a922a702c865..bedae361fdd508d51172d73c179483030c100f82 100644
--- a/tokio-xmpp/examples/download_avatars.rs
+++ b/tokio-xmpp/examples/download_avatars.rs
@@ -40,33 +40,29 @@ async fn main() {
let disco_info = make_disco();
// Main loop, processes events
- let mut wait_for_stream_end = false;
- let mut stream_ended = false;
- while !stream_ended {
- if let Some(event) = client.next().await {
- if wait_for_stream_end {
- /* Do nothing */
- } else if event.is_online() {
- println!("Online!");
+ while let Some(event) = client.next().await {
+ if event.is_online() {
+ println!("Online!");
- let caps = get_disco_caps(&disco_info, "https://gitlab.com/xmpp-rs/tokio-xmpp");
- let presence = make_presence(caps);
- client.send_stanza(presence.into()).await.unwrap();
- } else if let Some(stanza) = event.into_stanza() {
- match stanza {
- Stanza::Iq(iq) => {
- if let IqType::Get(payload) = iq.payload {
- if payload.is("query", ns::DISCO_INFO) {
- let query = DiscoInfoQuery::try_from(payload);
- match query {
- Ok(query) => {
- let mut disco = disco_info.clone();
- disco.node = query.node;
- let iq = Iq::from_result(iq.id, Some(disco))
- .with_to(iq.from.unwrap());
- client.send_stanza(iq.into()).await.unwrap();
- }
- Err(err) => client
+ let caps = get_disco_caps(&disco_info, "https://gitlab.com/xmpp-rs/tokio-xmpp");
+ let presence = make_presence(caps);
+ client.send_stanza(presence.into()).await.unwrap();
+ } else if let Some(stanza) = event.into_stanza() {
+ match stanza {
+ Stanza::Iq(iq) => {
+ if let IqType::Get(payload) = iq.payload {
+ if payload.is("query", ns::DISCO_INFO) {
+ let query = DiscoInfoQuery::try_from(payload);
+ match query {
+ Ok(query) => {
+ let mut disco = disco_info.clone();
+ disco.node = query.node;
+ let iq = Iq::from_result(iq.id, Some(disco))
+ .with_to(iq.from.unwrap());
+ client.send_stanza(iq.into()).await.unwrap();
+ }
+ Err(err) => {
+ client
.send_stanza(
make_error(
iq.from.unwrap(),
@@ -78,32 +74,11 @@ async fn main() {
.into(),
)
.await
- .unwrap(),
+ .unwrap();
}
- } else {
- // We MUST answer unhandled get iqs with a service-unavailable error.
- client
- .send_stanza(
- make_error(
- iq.from.unwrap(),
- iq.id,
- ErrorType::Cancel,
- DefinedCondition::ServiceUnavailable,
- "No handler defined for this kind of iq.",
- )
- .into(),
- )
- .await
- .unwrap();
- }
- } else if let IqType::Result(Some(payload)) = iq.payload {
- if payload.is("pubsub", ns::PUBSUB) {
- let pubsub = PubSub::try_from(payload).unwrap();
- let from = iq.from.clone().unwrap_or(jid.clone().into());
- handle_iq_result(pubsub, &from);
}
- } else if let IqType::Set(_) = iq.payload {
- // We MUST answer unhandled set iqs with a service-unavailable error.
+ } else {
+ // We MUST answer unhandled get iqs with a service-unavailable error.
client
.send_stanza(
make_error(
@@ -118,47 +93,64 @@ async fn main() {
.await
.unwrap();
}
+ } else if let IqType::Result(Some(payload)) = iq.payload {
+ if payload.is("pubsub", ns::PUBSUB) {
+ let pubsub = PubSub::try_from(payload).unwrap();
+ let from = iq.from.clone().unwrap_or(jid.clone().into());
+ handle_iq_result(pubsub, &from);
+ }
+ } else if let IqType::Set(_) = iq.payload {
+ // We MUST answer unhandled set iqs with a service-unavailable error.
+ client
+ .send_stanza(
+ make_error(
+ iq.from.unwrap(),
+ iq.id,
+ ErrorType::Cancel,
+ DefinedCondition::ServiceUnavailable,
+ "No handler defined for this kind of iq.",
+ )
+ .into(),
+ )
+ .await
+ .unwrap();
}
- Stanza::Message(message) => {
- let from = message.from.clone().unwrap();
- if let Some(body) = message.get_best_body(vec!["en"]) {
- if body.0 == "die" {
- println!("Secret die command triggered by {}", from);
- wait_for_stream_end = true;
- client.send_end().await.unwrap();
- }
+ }
+ Stanza::Message(message) => {
+ let from = message.from.clone().unwrap();
+ if let Some(body) = message.get_best_body(vec!["en"]) {
+ if body.0 == "die" {
+ println!("Secret die command triggered by {}", from);
+ break;
}
- for child in message.payloads {
- if child.is("event", ns::PUBSUB_EVENT) {
- let event = PubSubEvent::try_from(child).unwrap();
- if let PubSubEvent::PublishedItems { node, items } = event {
- if node.0 == ns::AVATAR_METADATA {
- for item in items.into_iter() {
- let payload = item.payload.clone().unwrap();
- if payload.is("metadata", ns::AVATAR_METADATA) {
- // TODO: do something with these metadata.
- let _metadata =
- AvatarMetadata::try_from(payload).unwrap();
- println!(
- "[1m{}[0m has published an avatar, downloading...",
- from.clone()
- );
- let iq = download_avatar(from.clone());
- client.send_stanza(iq.into()).await.unwrap();
- }
+ }
+ for child in message.payloads {
+ if child.is("event", ns::PUBSUB_EVENT) {
+ let event = PubSubEvent::try_from(child).unwrap();
+ if let PubSubEvent::PublishedItems { node, items } = event {
+ if node.0 == ns::AVATAR_METADATA {
+ for item in items.into_iter() {
+ let payload = item.payload.clone().unwrap();
+ if payload.is("metadata", ns::AVATAR_METADATA) {
+ // TODO: do something with these metadata.
+ let _metadata =
+ AvatarMetadata::try_from(payload).unwrap();
+ println!(
+ "[1m{}[0m has published an avatar, downloading...",
+ from.clone()
+ );
+ let iq = download_avatar(from.clone());
+ client.send_stanza(iq.into()).await.unwrap();
}
}
}
}
}
}
- // Nothing to do here.
- Stanza::Presence(_) => (),
}
+ // Nothing to do here.
+ Stanza::Presence(_) => (),
}
- } else {
- println!("stream_ended");
- stream_ended = true;
}
}
}
diff --git a/tokio-xmpp/examples/echo_bot.rs b/tokio-xmpp/examples/echo_bot.rs
index 055ff8f770153b72f37137203b9afe724c443000..204d2d01d76704250aeef3c6167509df5d5afd7c 100644
--- a/tokio-xmpp/examples/echo_bot.rs
+++ b/tokio-xmpp/examples/echo_bot.rs
@@ -21,50 +21,41 @@ async fn main() {
// Client instance
let mut client = Client::new(jid, password.to_owned());
- client.set_reconnect(true);
// Main loop, processes events
- let mut wait_for_stream_end = false;
- let mut stream_ended = false;
- while !stream_ended {
- if let Some(event) = client.next().await {
- println!("event: {:?}", event);
- if wait_for_stream_end {
- /* Do nothing */
- } else if event.is_online() {
- let jid = event
- .get_jid()
- .map(|jid| format!("{}", jid))
- .unwrap_or("unknown".to_owned());
- println!("Online at {}", jid);
+ while let Some(event) = client.next().await {
+ println!("event: {:?}", event);
+ if event.is_online() {
+ let jid = event
+ .get_jid()
+ .map(|jid| format!("{}", jid))
+ .unwrap_or("unknown".to_owned());
+ println!("Online at {}", jid);
- let presence = make_presence();
- client.send_stanza(presence.into()).await.unwrap();
- } else if let Some(message) = event
- .into_stanza()
- .and_then(|stanza| Message::try_from(stanza).ok())
- {
- match (message.from, message.bodies.get("")) {
- (Some(ref from), Some(ref body)) if body.0 == "die" => {
- println!("Secret die command triggered by {}", from);
- wait_for_stream_end = true;
- client.send_end().await.unwrap();
- }
- (Some(ref from), Some(ref body)) => {
- if message.type_ != MessageType::Error {
- // This is a message we'll echo
- let reply = make_reply(from.clone(), &body.0);
- client.send_stanza(reply.into()).await.unwrap();
- }
+ let presence = make_presence();
+ client.send_stanza(presence.into()).await.unwrap();
+ } else if let Some(message) = event
+ .into_stanza()
+ .and_then(|stanza| Message::try_from(stanza).ok())
+ {
+ match (message.from, message.bodies.get("")) {
+ (Some(ref from), Some(ref body)) if body.0 == "die" => {
+ println!("Secret die command triggered by {}", from);
+ break;
+ }
+ (Some(ref from), Some(ref body)) => {
+ if message.type_ != MessageType::Error {
+ // This is a message we'll echo
+ let reply = make_reply(from.clone(), &body.0);
+ client.send_stanza(reply.into()).await.unwrap();
}
- _ => {}
}
+ _ => {}
}
- } else {
- println!("stream_ended");
- stream_ended = true;
}
}
+
+ client.send_end().await.unwrap();
}
// Construct a
diff --git a/tokio-xmpp/src/client/bind.rs b/tokio-xmpp/src/client/bind.rs
deleted file mode 100644
index 7cacab130e1406476b20eae12d27f8637175ff5c..0000000000000000000000000000000000000000
--- a/tokio-xmpp/src/client/bind.rs
+++ /dev/null
@@ -1,56 +0,0 @@
-use std::io;
-
-use futures::{SinkExt, StreamExt};
-use tokio::io::{AsyncBufRead, AsyncWrite};
-use xmpp_parsers::bind::{BindQuery, BindResponse};
-use xmpp_parsers::iq::{Iq, IqType};
-use xmpp_parsers::stream_features::StreamFeatures;
-
-use crate::error::{Error, ProtocolError};
-use crate::event::Stanza;
-use crate::jid::{FullJid, Jid};
-use crate::xmlstream::{ReadError, XmppStream, XmppStreamElement};
-
-const BIND_REQ_ID: &str = "resource-bind";
-
-pub async fn bind(
- stream: &mut XmppStream,
- features: &StreamFeatures,
- jid: &Jid,
-) -> Result