Detailed changes
@@ -1,13 +1,13 @@
+use futures::{future, Sink, Stream};
+use jid::Jid;
+use minidom::Element;
use std::env::args;
use std::process::exit;
-use try_from::TryFrom;
-use futures::{Stream, Sink, future};
use tokio::runtime::current_thread::Runtime;
use tokio_xmpp::Client;
-use minidom::Element;
-use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow};
-use xmpp_parsers::message::{Message, MessageType, Body};
-use jid::Jid;
+use try_from::TryFrom;
+use xmpp_parsers::message::{Body, Message, MessageType};
+use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
fn main() {
let args: Vec<String> = args().collect();
@@ -38,16 +38,18 @@ fn main() {
let presence = make_presence();
send(presence);
- } else if let Some(message) = event.into_stanza()
+ } else if let Some(message) = event
+ .into_stanza()
.and_then(|stanza| Message::try_from(stanza).ok())
{
// This is a message we'll echo
match (message.from, message.bodies.get("")) {
- (Some(from), Some(body)) =>
+ (Some(from), Some(body)) => {
if message.type_ != MessageType::Error {
let reply = make_reply(from, &body.0);
send(reply);
- },
+ }
+ }
_ => (),
}
}
@@ -69,7 +71,9 @@ fn main() {
fn make_presence() -> Element {
let mut presence = Presence::new(PresenceType::None);
presence.show = PresenceShow::Chat;
- presence.statuses.insert(String::from("en"), String::from("Echoing messages."));
+ presence
+ .statuses
+ .insert(String::from("en"), String::from("Echoing messages."));
presence.into()
}
@@ -1,14 +1,14 @@
+use futures::{future, Sink, Stream};
+use jid::Jid;
+use minidom::Element;
use std::env::args;
use std::process::exit;
use std::str::FromStr;
-use try_from::TryFrom;
use tokio::runtime::current_thread::Runtime;
-use futures::{Stream, Sink, future};
use tokio_xmpp::Component;
-use minidom::Element;
-use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow};
-use xmpp_parsers::message::{Message, MessageType, Body};
-use jid::Jid;
+use try_from::TryFrom;
+use xmpp_parsers::message::{Body, Message, MessageType};
+use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
fn main() {
let args: Vec<String> = args().collect();
@@ -18,7 +18,11 @@ fn main() {
}
let jid = &args[1];
let password = &args[2];
- let server = &args.get(3).unwrap().parse().unwrap_or("127.0.0.1".to_owned());
+ let server = &args
+ .get(3)
+ .unwrap()
+ .parse()
+ .unwrap_or("127.0.0.1".to_owned());
let port: u16 = args.get(4).unwrap().parse().unwrap_or(5347u16);
// tokio_core context
@@ -42,18 +46,23 @@ fn main() {
println!("Online!");
// TODO: replace these hardcoded JIDs
- let presence = make_presence(Jid::from_str("test@component.linkmauve.fr/coucou").unwrap(), Jid::from_str("linkmauve@linkmauve.fr").unwrap());
+ let presence = make_presence(
+ Jid::from_str("test@component.linkmauve.fr/coucou").unwrap(),
+ Jid::from_str("linkmauve@linkmauve.fr").unwrap(),
+ );
send(presence);
- } else if let Some(message) = event.into_stanza()
+ } else if let Some(message) = event
+ .into_stanza()
.and_then(|stanza| Message::try_from(stanza).ok())
{
// This is a message we'll echo
match (message.from, message.bodies.get("")) {
- (Some(from), Some(body)) =>
+ (Some(from), Some(body)) => {
if message.type_ != MessageType::Error {
let reply = make_reply(from, &body.0);
send(reply);
- },
+ }
+ }
_ => (),
}
}
@@ -77,7 +86,9 @@ fn make_presence(from: Jid, to: Jid) -> Element {
presence.from = Some(from);
presence.to = Some(to);
presence.show = PresenceShow::Chat;
- presence.statuses.insert(String::from("en"), String::from("Echoing messages."));
+ presence
+ .statuses
+ .insert(String::from("en"), String::from("Echoing messages."));
presence.into()
}
@@ -1,23 +1,22 @@
+use futures::{sink, Async, Future, Poll, Stream};
+use minidom::Element;
+use sasl::client::mechanisms::{Anonymous, Plain, Scram};
+use sasl::client::Mechanism;
+use sasl::common::scram::{Sha1, Sha256};
+use sasl::common::Credentials;
use std::mem::replace;
use std::str::FromStr;
-use futures::{Future, Poll, Async, sink, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
-use sasl::common::Credentials;
-use sasl::common::scram::{Sha1, Sha256};
-use sasl::client::Mechanism;
-use sasl::client::mechanisms::{Scram, Plain, Anonymous};
-use minidom::Element;
-use xmpp_parsers::sasl::{Auth, Challenge, Response, Success, Failure, Mechanism as XMPPMechanism};
use try_from::TryFrom;
+use xmpp_parsers::sasl::{Auth, Challenge, Failure, Mechanism as XMPPMechanism, Response, Success};
+use crate::stream_start::StreamStart;
use crate::xmpp_codec::Packet;
use crate::xmpp_stream::XMPPStream;
-use crate::stream_start::StreamStart;
-use crate::{Error, AuthError, ProtocolError};
+use crate::{AuthError, Error, ProtocolError};
const NS_XMPP_SASL: &str = "urn:ietf:params:xml:ns:xmpp-sasl";
-
pub struct ClientAuth<S: AsyncWrite> {
state: ClientAuthState<S>,
mechanism: Box<Mechanism>,
@@ -39,8 +38,9 @@ impl<S: AsyncWrite> ClientAuth<S> {
Box::new(Anonymous::new()),
];
- let mech_names: Vec<String> =
- stream.stream_features.get_child("mechanisms", NS_XMPP_SASL)
+ let mech_names: Vec<String> = stream
+ .stream_features
+ .get_child("mechanisms", NS_XMPP_SASL)
.ok_or(AuthError::NoMechanism)?
.children()
.filter(|child| child.is("mechanism", NS_XMPP_SASL))
@@ -52,20 +52,18 @@ impl<S: AsyncWrite> ClientAuth<S> {
let name = mech.name().to_owned();
if mech_names.iter().any(|name1| *name1 == name) {
// println!("SASL mechanism selected: {:?}", name);
- let initial = mech.initial()
- .map_err(AuthError::Sasl)?;
+ let initial = mech.initial().map_err(AuthError::Sasl)?;
let mut this = ClientAuth {
state: ClientAuthState::Invalid,
mechanism: mech,
};
- let mechanism = XMPPMechanism::from_str(&name)
- .map_err(ProtocolError::Parsers)?;
+ let mechanism = XMPPMechanism::from_str(&name).map_err(ProtocolError::Parsers)?;
this.send(
stream,
Auth {
mechanism,
data: initial,
- }
+ },
);
return Ok(this);
}
@@ -89,61 +87,55 @@ impl<S: AsyncRead + AsyncWrite> Future for ClientAuth<S> {
let state = replace(&mut self.state, ClientAuthState::Invalid);
match state {
- ClientAuthState::WaitSend(mut send) =>
- match send.poll() {
- Ok(Async::Ready(stream)) => {
- self.state = ClientAuthState::WaitRecv(stream);
+ ClientAuthState::WaitSend(mut send) => match send.poll() {
+ Ok(Async::Ready(stream)) => {
+ self.state = ClientAuthState::WaitRecv(stream);
+ self.poll()
+ }
+ Ok(Async::NotReady) => {
+ self.state = ClientAuthState::WaitSend(send);
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e)?,
+ },
+ ClientAuthState::WaitRecv(mut stream) => match stream.poll() {
+ Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
+ if let Ok(challenge) = Challenge::try_from(stanza.clone()) {
+ let response = self
+ .mechanism
+ .response(&challenge.data)
+ .map_err(AuthError::Sasl)?;
+ self.send(stream, Response { data: response });
self.poll()
- },
- Ok(Async::NotReady) => {
- self.state = ClientAuthState::WaitSend(send);
- Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e)?,
- },
- ClientAuthState::WaitRecv(mut stream) =>
- match stream.poll() {
- Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
- if let Ok(challenge) = Challenge::try_from(stanza.clone()) {
- let response = self.mechanism.response(&challenge.data)
- .map_err(AuthError::Sasl)?;
- self.send(stream, Response { data: response });
- self.poll()
- } else if let Ok(_) = Success::try_from(stanza.clone()) {
- let start = stream.restart();
- self.state = ClientAuthState::Start(start);
- self.poll()
- } else if let Ok(failure) = Failure::try_from(stanza) {
- Err(AuthError::Fail(failure.defined_condition))?
- } else {
- Ok(Async::NotReady)
- }
- }
- Ok(Async::Ready(_event)) => {
- // println!("ClientAuth ignore {:?}", _event);
- Ok(Async::NotReady)
- },
- Ok(_) => {
- self.state = ClientAuthState::WaitRecv(stream);
- Ok(Async::NotReady)
- },
- Err(e) =>
- Err(ProtocolError::Parser(e))?
- },
- ClientAuthState::Start(mut start) =>
- match start.poll() {
- Ok(Async::Ready(stream)) =>
- Ok(Async::Ready(stream)),
- Ok(Async::NotReady) => {
+ } else if let Ok(_) = Success::try_from(stanza.clone()) {
+ let start = stream.restart();
self.state = ClientAuthState::Start(start);
+ self.poll()
+ } else if let Ok(failure) = Failure::try_from(stanza) {
+ Err(AuthError::Fail(failure.defined_condition))?
+ } else {
Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e)
- },
- ClientAuthState::Invalid =>
- unreachable!(),
+ }
+ }
+ Ok(Async::Ready(_event)) => {
+ // println!("ClientAuth ignore {:?}", _event);
+ Ok(Async::NotReady)
+ }
+ Ok(_) => {
+ self.state = ClientAuthState::WaitRecv(stream);
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(ProtocolError::Parser(e))?,
+ },
+ ClientAuthState::Start(mut start) => match start.poll() {
+ Ok(Async::Ready(stream)) => Ok(Async::Ready(stream)),
+ Ok(Async::NotReady) => {
+ self.state = ClientAuthState::Start(start);
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ },
+ ClientAuthState::Invalid => unreachable!(),
}
}
}
@@ -1,9 +1,9 @@
+use futures::{sink, Async, Future, Poll, Stream};
use std::mem::replace;
-use futures::{Future, Poll, Async, sink, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
-use xmpp_parsers::iq::{Iq, IqType};
-use xmpp_parsers::bind::Bind;
use try_from::TryFrom;
+use xmpp_parsers::bind::Bind;
+use xmpp_parsers::iq::{Iq, IqType};
use crate::xmpp_codec::Packet;
use crate::xmpp_stream::XMPPStream;
@@ -26,16 +26,17 @@ impl<S: AsyncWrite> ClientBind<S> {
pub fn new(stream: XMPPStream<S>) -> Self {
match stream.stream_features.get_child("bind", NS_XMPP_BIND) {
None =>
- // No resource binding available,
- // return the (probably // usable) stream immediately
- ClientBind::Unsupported(stream),
+ // No resource binding available,
+ // return the (probably // usable) stream immediately
+ {
+ ClientBind::Unsupported(stream)
+ }
Some(_) => {
let resource = stream.jid.resource.clone();
- let iq = Iq::from_set(Bind::new(resource))
- .with_id(BIND_REQ_ID.to_string());
+ let iq = Iq::from_set(Bind::new(resource)).with_id(BIND_REQ_ID.to_string());
let send = stream.send_stanza(iq);
ClientBind::WaitSend(send)
- },
+ }
}
}
}
@@ -48,59 +49,51 @@ impl<S: AsyncRead + AsyncWrite> Future for ClientBind<S> {
let state = replace(self, ClientBind::Invalid);
match state {
- ClientBind::Unsupported(stream) =>
- Ok(Async::Ready(stream)),
- ClientBind::WaitSend(mut send) => {
- match send.poll() {
- Ok(Async::Ready(stream)) => {
- replace(self, ClientBind::WaitRecv(stream));
- self.poll()
- },
- Ok(Async::NotReady) => {
- replace(self, ClientBind::WaitSend(send));
- Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e)?
+ ClientBind::Unsupported(stream) => Ok(Async::Ready(stream)),
+ ClientBind::WaitSend(mut send) => match send.poll() {
+ Ok(Async::Ready(stream)) => {
+ replace(self, ClientBind::WaitRecv(stream));
+ self.poll()
+ }
+ Ok(Async::NotReady) => {
+ replace(self, ClientBind::WaitSend(send));
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e)?,
},
- ClientBind::WaitRecv(mut stream) => {
- match stream.poll() {
- Ok(Async::Ready(Some(Packet::Stanza(stanza)))) =>
- match Iq::try_from(stanza) {
- Ok(iq) => if iq.id == Some(BIND_REQ_ID.to_string()) {
- match iq.payload {
- IqType::Result(payload) => {
- payload
- .and_then(|payload| Bind::try_from(payload).ok())
- .map(|bind| match bind {
- Bind::Jid(jid) => stream.jid = jid,
- _ => {}
- });
- Ok(Async::Ready(stream))
- },
- _ =>
- Err(ProtocolError::InvalidBindResponse)?,
+ ClientBind::WaitRecv(mut stream) => match stream.poll() {
+ Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => match Iq::try_from(stanza) {
+ Ok(iq) => {
+ if iq.id == Some(BIND_REQ_ID.to_string()) {
+ match iq.payload {
+ IqType::Result(payload) => {
+ payload
+ .and_then(|payload| Bind::try_from(payload).ok())
+ .map(|bind| match bind {
+ Bind::Jid(jid) => stream.jid = jid,
+ _ => {}
+ });
+ Ok(Async::Ready(stream))
}
- } else {
- Ok(Async::NotReady)
- },
- _ => Ok(Async::NotReady),
- },
- Ok(Async::Ready(_)) => {
- replace(self, ClientBind::WaitRecv(stream));
- self.poll()
- },
- Ok(Async::NotReady) => {
- replace(self, ClientBind::WaitRecv(stream));
- Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e)?,
+ _ => Err(ProtocolError::InvalidBindResponse)?,
+ }
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+ _ => Ok(Async::NotReady),
+ },
+ Ok(Async::Ready(_)) => {
+ replace(self, ClientBind::WaitRecv(stream));
+ self.poll()
+ }
+ Ok(Async::NotReady) => {
+ replace(self, ClientBind::WaitRecv(stream));
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e)?,
},
- ClientBind::Invalid =>
- unreachable!(),
+ ClientBind::Invalid => unreachable!(),
}
}
}
@@ -1,19 +1,19 @@
+use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
+use idna;
+use jid::{Jid, JidParseError};
+use minidom::Element;
+use sasl::common::{ChannelBinding, Credentials};
use std::mem::replace;
use std::str::FromStr;
use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tls::TlsStream;
-use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done};
-use minidom::Element;
-use jid::{Jid, JidParseError};
-use sasl::common::{Credentials, ChannelBinding};
-use idna;
+use super::event::Event;
+use super::happy_eyeballs::Connecter;
+use super::starttls::{StartTlsClient, NS_XMPP_TLS};
use super::xmpp_codec::Packet;
use super::xmpp_stream;
-use super::starttls::{NS_XMPP_TLS, StartTlsClient};
-use super::happy_eyeballs::Connecter;
-use super::event::Event;
use super::{Error, ProtocolError};
mod auth;
@@ -34,7 +34,7 @@ const NS_JABBER_CLIENT: &str = "jabber:client";
enum ClientState {
Invalid,
Disconnected,
- Connecting(Box<Future<Item=XMPPStream, Error=Error>>),
+ Connecting(Box<Future<Item = XMPPStream, Error = Error>>),
Connected(XMPPStream),
}
@@ -53,51 +53,62 @@ impl Client {
})
}
- fn make_connect(jid: Jid, password: String) -> impl Future<Item=XMPPStream, Error=Error> {
+ fn make_connect(jid: Jid, password: String) -> impl Future<Item = XMPPStream, Error = Error> {
let username = jid.node.as_ref().unwrap().to_owned();
let jid1 = jid.clone();
let jid2 = jid.clone();
let password = password;
done(idna::domain_to_ascii(&jid.domain))
.map_err(|_| Error::Idna)
- .and_then(|domain|
- done(Connecter::from_lookup(&domain, Some("_xmpp-client._tcp"), 5222))
- )
+ .and_then(|domain| {
+ done(Connecter::from_lookup(
+ &domain,
+ Some("_xmpp-client._tcp"),
+ 5222,
+ ))
+ })
.flatten()
- .and_then(move |tcp_stream|
- xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
- ).and_then(|xmpp_stream| {
+ .and_then(move |tcp_stream| {
+ xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
+ })
+ .and_then(|xmpp_stream| {
if Self::can_starttls(&xmpp_stream) {
Ok(Self::starttls(xmpp_stream))
} else {
Err(Error::Protocol(ProtocolError::NoTls))
}
- }).flatten()
- .and_then(|tls_stream|
- XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned())
- ).and_then(move |xmpp_stream|
- done(Self::auth(xmpp_stream, username, password))
- // TODO: flatten?
- ).and_then(|auth| auth)
+ })
+ .flatten()
+ .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()))
+ .and_then(
+ move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten?
+ )
+ .and_then(|auth| auth)
+ .and_then(|xmpp_stream| Self::bind(xmpp_stream))
.and_then(|xmpp_stream| {
- Self::bind(xmpp_stream)
- }).and_then(|xmpp_stream| {
// println!("Bound to {}", xmpp_stream.jid);
Ok(xmpp_stream)
})
}
fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
- stream.stream_features
+ stream
+ .stream_features
.get_child("starttls", NS_XMPP_TLS)
.is_some()
}
- fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
+ fn starttls<S: AsyncRead + AsyncWrite>(
+ stream: xmpp_stream::XMPPStream<S>,
+ ) -> StartTlsClient<S> {
StartTlsClient::from_stream(stream)
}
- fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, Error> {
+ fn auth<S: AsyncRead + AsyncWrite>(
+ stream: xmpp_stream::XMPPStream<S>,
+ username: String,
+ password: String,
+ ) -> Result<ClientAuth<S>, Error> {
let creds = Credentials::default()
.with_username(username)
.with_password(password)
@@ -118,31 +129,25 @@ impl Stream for Client {
let state = replace(&mut self.state, ClientState::Invalid);
match state {
- ClientState::Invalid =>
- Err(Error::InvalidState),
- ClientState::Disconnected =>
- Ok(Async::Ready(None)),
- ClientState::Connecting(mut connect) => {
- match connect.poll() {
- Ok(Async::Ready(stream)) => {
- self.state = ClientState::Connected(stream);
- Ok(Async::Ready(Some(Event::Online)))
- },
- Ok(Async::NotReady) => {
- self.state = ClientState::Connecting(connect);
- Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e),
+ ClientState::Invalid => Err(Error::InvalidState),
+ ClientState::Disconnected => Ok(Async::Ready(None)),
+ ClientState::Connecting(mut connect) => match connect.poll() {
+ Ok(Async::Ready(stream)) => {
+ self.state = ClientState::Connected(stream);
+ Ok(Async::Ready(Some(Event::Online)))
+ }
+ Ok(Async::NotReady) => {
+ self.state = ClientState::Connecting(connect);
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e),
},
ClientState::Connected(mut stream) => {
// Poll sink
match stream.poll_complete() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(())) => (),
- Err(e) =>
- return Err(e)?,
+ Err(e) => return Err(e)?,
};
// Poll stream
@@ -151,20 +156,18 @@ impl Stream for Client {
// EOF
self.state = ClientState::Disconnected;
Ok(Async::Ready(Some(Event::Disconnected)))
- },
+ }
Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
self.state = ClientState::Connected(stream);
Ok(Async::Ready(Some(Event::Stanza(stanza))))
- },
- Ok(Async::NotReady) |
- Ok(Async::Ready(_)) => {
+ }
+ Ok(Async::NotReady) | Ok(Async::Ready(_)) => {
self.state = ClientState::Connected(stream);
Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e)?,
+ }
+ Err(e) => Err(e)?,
}
- },
+ }
}
}
}
@@ -175,30 +178,23 @@ impl Sink for Client {
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
match self.state {
- ClientState::Connected(ref mut stream) =>
- match stream.start_send(Packet::Stanza(item)) {
- Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
- Ok(AsyncSink::NotReady(stanza)),
- Ok(AsyncSink::NotReady(_)) =>
- panic!("Client.start_send with stanza but got something else back"),
- Ok(AsyncSink::Ready) => {
- Ok(AsyncSink::Ready)
- },
- Err(e) =>
- Err(e)?,
- },
- _ =>
- Ok(AsyncSink::NotReady(item)),
+ ClientState::Connected(ref mut stream) => match stream.start_send(Packet::Stanza(item))
+ {
+ Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)),
+ Ok(AsyncSink::NotReady(_)) => {
+ panic!("Client.start_send with stanza but got something else back")
+ }
+ Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
+ Err(e) => Err(e)?,
+ },
+ _ => Ok(AsyncSink::NotReady(item)),
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
match self.state {
- ClientState::Connected(ref mut stream) =>
- stream.poll_complete()
- .map_err(|e| e.into()),
- _ =>
- Ok(Async::Ready(())),
+ ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()),
+ _ => Ok(Async::Ready(())),
}
}
}
@@ -1,11 +1,11 @@
+use futures::{sink, Async, Future, Poll, Stream};
use std::mem::replace;
-use futures::{Future, Poll, Async, sink, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use xmpp_parsers::component::Handshake;
use crate::xmpp_codec::Packet;
use crate::xmpp_stream::XMPPStream;
-use crate::{Error, AuthError};
+use crate::{AuthError, Error};
const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
@@ -29,7 +29,7 @@ impl<S: AsyncWrite> ComponentAuth<S> {
};
this.send(
stream,
- Handshake::from_password_and_stream_id(&password, &sid)
+ Handshake::from_password_and_stream_id(&password, &sid),
);
Ok(this)
}
@@ -50,45 +50,40 @@ impl<S: AsyncRead + AsyncWrite> Future for ComponentAuth<S> {
let state = replace(&mut self.state, ComponentAuthState::Invalid);
match state {
- ComponentAuthState::WaitSend(mut send) =>
- match send.poll() {
- Ok(Async::Ready(stream)) => {
- self.state = ComponentAuthState::WaitRecv(stream);
- self.poll()
- },
- Ok(Async::NotReady) => {
- self.state = ComponentAuthState::WaitSend(send);
- Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e)?
- },
- ComponentAuthState::WaitRecv(mut stream) =>
- match stream.poll() {
- Ok(Async::Ready(Some(Packet::Stanza(ref stanza))))
- if stanza.is("handshake", NS_JABBER_COMPONENT_ACCEPT) =>
- {
- self.state = ComponentAuthState::Invalid;
- Ok(Async::Ready(stream))
- },
- Ok(Async::Ready(Some(Packet::Stanza(ref stanza))))
- if stanza.is("error", "http://etherx.jabber.org/streams") =>
- {
- Err(AuthError::ComponentFail.into())
- },
- Ok(Async::Ready(event)) => {
- println!("ComponentAuth ignore {:?}", event);
- Ok(Async::NotReady)
- },
- Ok(_) => {
- self.state = ComponentAuthState::WaitRecv(stream);
- Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e)?
- },
- ComponentAuthState::Invalid =>
- unreachable!(),
+ ComponentAuthState::WaitSend(mut send) => match send.poll() {
+ Ok(Async::Ready(stream)) => {
+ self.state = ComponentAuthState::WaitRecv(stream);
+ self.poll()
+ }
+ Ok(Async::NotReady) => {
+ self.state = ComponentAuthState::WaitSend(send);
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e)?,
+ },
+ ComponentAuthState::WaitRecv(mut stream) => match stream.poll() {
+ Ok(Async::Ready(Some(Packet::Stanza(ref stanza))))
+ if stanza.is("handshake", NS_JABBER_COMPONENT_ACCEPT) =>
+ {
+ self.state = ComponentAuthState::Invalid;
+ Ok(Async::Ready(stream))
+ }
+ Ok(Async::Ready(Some(Packet::Stanza(ref stanza))))
+ if stanza.is("error", "http://etherx.jabber.org/streams") =>
+ {
+ Err(AuthError::ComponentFail.into())
+ }
+ Ok(Async::Ready(event)) => {
+ println!("ComponentAuth ignore {:?}", event);
+ Ok(Async::NotReady)
+ }
+ Ok(_) => {
+ self.state = ComponentAuthState::WaitRecv(stream);
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e)?,
+ },
+ ComponentAuthState::Invalid => unreachable!(),
}
}
}
@@ -1,18 +1,18 @@
//! Components in XMPP are services/gateways that are logged into an
//! XMPP server under a JID consisting of just a domain name. They are
//! allowed to use any user and resource identifiers in their stanzas.
+use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
+use jid::{Jid, JidParseError};
+use minidom::Element;
use std::mem::replace;
use std::str::FromStr;
use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
-use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done};
-use minidom::Element;
-use jid::{Jid, JidParseError};
+use super::event::Event;
+use super::happy_eyeballs::Connecter;
use super::xmpp_codec::Packet;
use super::xmpp_stream;
-use super::happy_eyeballs::Connecter;
-use super::event::Event;
use super::Error;
mod auth;
@@ -31,7 +31,7 @@ const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
enum ComponentState {
Invalid,
Disconnected,
- Connecting(Box<Future<Item=XMPPStream, Error=Error>>),
+ Connecting(Box<Future<Item = XMPPStream, Error = Error>>),
Connected(XMPPStream),
}
@@ -50,19 +50,30 @@ impl Component {
})
}
- fn make_connect(jid: Jid, password: String, server: &str, port: u16) -> impl Future<Item=XMPPStream, Error=Error> {
+ fn make_connect(
+ jid: Jid,
+ password: String,
+ server: &str,
+ port: u16,
+ ) -> impl Future<Item = XMPPStream, Error = Error> {
let jid1 = jid.clone();
let password = password;
done(Connecter::from_lookup(server, None, port))
.flatten()
.and_then(move |tcp_stream| {
- xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
- }).and_then(move |xmpp_stream| {
- Self::auth(xmpp_stream, password).expect("auth")
+ xmpp_stream::XMPPStream::start(
+ tcp_stream,
+ jid1,
+ NS_JABBER_COMPONENT_ACCEPT.to_owned(),
+ )
})
+ .and_then(move |xmpp_stream| Self::auth(xmpp_stream, password).expect("auth"))
}
- fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, password: String) -> Result<ComponentAuth<S>, Error> {
+ fn auth<S: AsyncRead + AsyncWrite>(
+ stream: xmpp_stream::XMPPStream<S>,
+ password: String,
+ ) -> Result<ComponentAuth<S>, Error> {
ComponentAuth::new(stream, password)
}
}
@@ -75,31 +86,25 @@ impl Stream for Component {
let state = replace(&mut self.state, ComponentState::Invalid);
match state {
- ComponentState::Invalid =>
- Err(Error::InvalidState),
- ComponentState::Disconnected =>
- Ok(Async::Ready(None)),
- ComponentState::Connecting(mut connect) => {
- match connect.poll() {
- Ok(Async::Ready(stream)) => {
- self.state = ComponentState::Connected(stream);
- Ok(Async::Ready(Some(Event::Online)))
- },
- Ok(Async::NotReady) => {
- self.state = ComponentState::Connecting(connect);
- Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e),
+ ComponentState::Invalid => Err(Error::InvalidState),
+ ComponentState::Disconnected => Ok(Async::Ready(None)),
+ ComponentState::Connecting(mut connect) => match connect.poll() {
+ Ok(Async::Ready(stream)) => {
+ self.state = ComponentState::Connected(stream);
+ Ok(Async::Ready(Some(Event::Online)))
+ }
+ Ok(Async::NotReady) => {
+ self.state = ComponentState::Connecting(connect);
+ Ok(Async::NotReady)
}
+ Err(e) => Err(e),
},
ComponentState::Connected(mut stream) => {
// Poll sink
match stream.poll_complete() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(())) => (),
- Err(e) =>
- return Err(e)?,
+ Err(e) => return Err(e)?,
};
// Poll stream
@@ -107,24 +112,23 @@ impl Stream for Component {
Ok(Async::NotReady) => {
self.state = ComponentState::Connected(stream);
Ok(Async::NotReady)
- },
+ }
Ok(Async::Ready(None)) => {
// EOF
self.state = ComponentState::Disconnected;
Ok(Async::Ready(Some(Event::Disconnected)))
- },
+ }
Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
self.state = ComponentState::Connected(stream);
Ok(Async::Ready(Some(Event::Stanza(stanza))))
- },
+ }
Ok(Async::Ready(_)) => {
self.state = ComponentState::Connected(stream);
Ok(Async::NotReady)
- },
- Err(e) =>
- Err(e)?,
+ }
+ Err(e) => Err(e)?,
}
- },
+ }
}
}
}
@@ -135,30 +139,26 @@ impl Sink for Component {
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
match self.state {
- ComponentState::Connected(ref mut stream) =>
- match stream.start_send(Packet::Stanza(item)) {
- Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
- Ok(AsyncSink::NotReady(stanza)),
- Ok(AsyncSink::NotReady(_)) =>
- panic!("Component.start_send with stanza but got something else back"),
- Ok(AsyncSink::Ready) => {
- Ok(AsyncSink::Ready)
- },
- Err(e) =>
- Err(e)?,
- },
- _ =>
- Ok(AsyncSink::NotReady(item)),
+ ComponentState::Connected(ref mut stream) => match stream
+ .start_send(Packet::Stanza(item))
+ {
+ Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)),
+ Ok(AsyncSink::NotReady(_)) => {
+ panic!("Component.start_send with stanza but got something else back")
+ }
+ Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
+ Err(e) => Err(e)?,
+ },
+ _ => Ok(AsyncSink::NotReady(item)),
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
match &mut self.state {
- &mut ComponentState::Connected(ref mut stream) =>
- stream.poll_complete()
- .map_err(|e| e.into()),
- _ =>
- Ok(Async::Ready(())),
+ &mut ComponentState::Connected(ref mut stream) => {
+ stream.poll_complete().map_err(|e| e.into())
+ }
+ _ => Ok(Async::Ready(())),
}
}
}
@@ -1,11 +1,11 @@
-use std::io::Error as IoError;
-use std::error::Error as StdError;
-use std::str::Utf8Error;
+use native_tls::Error as TlsError;
use std::borrow::Cow;
+use std::error::Error as StdError;
use std::fmt;
-use native_tls::Error as TlsError;
-use trust_dns_resolver::error::ResolveError;
+use std::io::Error as IoError;
+use std::str::Utf8Error;
use trust_dns_proto::error::ProtoError;
+use trust_dns_resolver::error::ResolveError;
use xmpp_parsers::error::Error as ParsersError;
use xmpp_parsers::sasl::DefinedCondition as SaslDefinedCondition;
@@ -1,18 +1,18 @@
-use std::mem;
-use std::io::Error as IoError;
-use std::net::SocketAddr;
+use crate::{ConnecterError, Error};
+use futures::{Async, Future, Poll};
+use std::cell::RefCell;
use std::collections::BTreeMap;
use std::collections::VecDeque;
-use std::cell::RefCell;
-use futures::{Future, Poll, Async};
-use tokio::net::TcpStream;
+use std::io::Error as IoError;
+use std::mem;
+use std::net::SocketAddr;
use tokio::net::tcp::ConnectFuture;
-use trust_dns_resolver::{IntoName, Name, ResolverFuture, error::ResolveError};
+use tokio::net::TcpStream;
+use trust_dns_resolver::config::LookupIpStrategy;
use trust_dns_resolver::lookup::SrvLookupFuture;
use trust_dns_resolver::lookup_ip::LookupIpFuture;
use trust_dns_resolver::system_conf;
-use trust_dns_resolver::config::LookupIpStrategy;
-use crate::{Error, ConnecterError};
+use trust_dns_resolver::{error::ResolveError, IntoName, Name, ResolverFuture};
enum State {
AwaitResolver(Box<Future<Item = ResolverFuture, Error = ResolveError> + Send>),
@@ -31,23 +31,26 @@ pub struct Connecter {
error: Option<Error>,
}
-fn resolver_future() -> Result<Box<Future<Item = ResolverFuture, Error = ResolveError> + Send>, IoError> {
+fn resolver_future(
+) -> Result<Box<Future<Item = ResolverFuture, Error = ResolveError> + Send>, IoError> {
let (conf, mut opts) = system_conf::read_system_conf()?;
opts.ip_strategy = LookupIpStrategy::Ipv4AndIpv6;
Ok(ResolverFuture::new(conf, opts))
}
impl Connecter {
- pub fn from_lookup(domain: &str, srv: Option<&str>, fallback_port: u16) -> Result<Connecter, Error> {
+ pub fn from_lookup(
+ domain: &str,
+ srv: Option<&str>,
+ fallback_port: u16,
+ ) -> Result<Connecter, Error> {
if let Ok(ip) = domain.parse() {
// use specified IP address, not domain name, skip the whole dns part
- let connect =
- RefCell::new(TcpStream::connect(&SocketAddr::new(ip, fallback_port)));
+ let connect = RefCell::new(TcpStream::connect(&SocketAddr::new(ip, fallback_port)));
return Ok(Connecter {
fallback_port,
srv_domain: None,
- domain: "nohost".into_name()
- .map_err(ConnecterError::Dns)?,
+ domain: "nohost".into_name().map_err(ConnecterError::Dns)?,
state: State::Connecting(None, vec![connect]),
targets: VecDeque::new(),
error: None,
@@ -56,20 +59,18 @@ impl Connecter {
let state = State::AwaitResolver(resolver_future()?);
let srv_domain = match srv {
- Some(srv) =>
- Some(format!("{}.{}.", srv, domain)
- .into_name()
- .map_err(ConnecterError::Dns)?
- ),
- None =>
- None,
+ Some(srv) => Some(
+ format!("{}.{}.", srv, domain)
+ .into_name()
+ .map_err(ConnecterError::Dns)?,
+ ),
+ None => None,
};
Ok(Connecter {
fallback_port,
srv_domain,
- domain: domain.into_name()
- .map_err(ConnecterError::Dns)?,
+ domain: domain.into_name().map_err(ConnecterError::Dns)?,
state,
targets: VecDeque::new(),
error: None,
@@ -97,8 +98,8 @@ impl Future for Connecter {
self.state = State::ResolveSrv(resolver, srv_lookup);
}
None => {
- self.targets =
- [(self.domain.clone(), self.fallback_port)].into_iter()
+ self.targets = [(self.domain.clone(), self.fallback_port)]
+ .into_iter()
.cloned()
.collect();
self.state = State::Connecting(Some(resolver), vec![]);
@@ -115,22 +116,19 @@ impl Future for Connecter {
Ok(Async::NotReady)
}
Ok(Async::Ready(srv_result)) => {
- let srv_map: BTreeMap<_, _> =
- srv_result.iter()
+ let srv_map: BTreeMap<_, _> = srv_result
+ .iter()
.map(|srv| (srv.priority(), (srv.target().clone(), srv.port())))
.collect();
- let targets =
- srv_map.into_iter()
- .map(|(_, tp)| tp)
- .collect();
+ let targets = srv_map.into_iter().map(|(_, tp)| tp).collect();
self.targets = targets;
self.state = State::Connecting(Some(resolver), vec![]);
self.poll()
}
Err(_) => {
// ignore, fallback
- self.targets =
- [(self.domain.clone(), self.fallback_port)].into_iter()
+ self.targets = [(self.domain.clone(), self.fallback_port)]
+ .into_iter()
.cloned()
.collect();
self.state = State::Connecting(Some(resolver), vec![]);
@@ -147,36 +145,31 @@ impl Future for Connecter {
self.poll()
} else if connects.len() > 0 {
let mut success = None;
- connects.retain(|connect| {
- match connect.borrow_mut().poll() {
- Ok(Async::NotReady) => true,
- Ok(Async::Ready(connection)) => {
- success = Some(connection);
- false
+ connects.retain(|connect| match connect.borrow_mut().poll() {
+ Ok(Async::NotReady) => true,
+ Ok(Async::Ready(connection)) => {
+ success = Some(connection);
+ false
+ }
+ Err(e) => {
+ if self.error.is_none() {
+ self.error = Some(e.into());
}
- Err(e) => {
- if self.error.is_none() {
- self.error = Some(e.into());
- }
- false
- },
+ false
}
});
match success {
- Some(connection) =>
- Ok(Async::Ready(connection)),
+ Some(connection) => Ok(Async::Ready(connection)),
None => {
self.state = State::Connecting(resolver, connects);
Ok(Async::NotReady)
- },
+ }
}
} else {
// All targets tried
match self.error.take() {
- None =>
- Err(ConnecterError::AllFailed.into()),
- Some(e) =>
- Err(e),
+ None => Err(ConnecterError::AllFailed.into()),
+ Some(e) => Err(e),
}
}
}
@@ -187,8 +180,8 @@ impl Future for Connecter {
Ok(Async::NotReady)
}
Ok(Async::Ready(ip_result)) => {
- let connects =
- ip_result.iter()
+ let connects = ip_result
+ .iter()
.map(|ip| RefCell::new(TcpStream::connect(&SocketAddr::new(ip, port))))
.collect();
self.state = State::Connecting(Some(resolver), connects);
@@ -204,8 +197,7 @@ impl Future for Connecter {
}
}
}
- _ => panic!("")
+ _ => panic!(""),
}
}
}
-
@@ -5,17 +5,17 @@
#[macro_use]
extern crate derive_error;
+mod starttls;
+mod stream_start;
pub mod xmpp_codec;
pub mod xmpp_stream;
-mod stream_start;
-mod starttls;
pub use crate::starttls::StartTlsClient;
-mod happy_eyeballs;
mod event;
+mod happy_eyeballs;
pub use crate::event::Event;
mod client;
pub use crate::client::Client;
mod component;
pub use crate::component::Component;
mod error;
-pub use crate::error::{Error, ProtocolError, AuthError, ConnecterError, ParseError, ParserError};
+pub use crate::error::{AuthError, ConnecterError, Error, ParseError, ParserError, ProtocolError};
@@ -1,12 +1,12 @@
-use std::mem::replace;
-use futures::{Future, Sink, Poll, Async};
-use futures::stream::Stream;
use futures::sink;
-use tokio_io::{AsyncRead, AsyncWrite};
-use tokio_tls::{TlsStream, TlsConnector, Connect};
-use native_tls::TlsConnector as NativeTlsConnector;
-use minidom::Element;
+use futures::stream::Stream;
+use futures::{Async, Future, Poll, Sink};
use jid::Jid;
+use minidom::Element;
+use native_tls::TlsConnector as NativeTlsConnector;
+use std::mem::replace;
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_tls::{Connect, TlsConnector, TlsStream};
use crate::xmpp_codec::Packet;
use crate::xmpp_stream::XMPPStream;
@@ -15,7 +15,6 @@ use crate::Error;
/// XMPP TLS XML namespace
pub const NS_XMPP_TLS: &str = "urn:ietf:params:xml:ns:xmpp-tls";
-
/// XMPP stream that switches to TLS if available in received features
pub struct StartTlsClient<S: AsyncRead + AsyncWrite> {
state: StartTlsClientState<S>,
@@ -34,9 +33,7 @@ impl<S: AsyncRead + AsyncWrite> StartTlsClient<S> {
pub fn from_stream(xmpp_stream: XMPPStream<S>) -> Self {
let jid = xmpp_stream.jid.clone();
- let nonza = Element::builder("starttls")
- .ns(NS_XMPP_TLS)
- .build();
+ let nonza = Element::builder("starttls").ns(NS_XMPP_TLS).build();
let packet = Packet::Stanza(nonza);
let send = xmpp_stream.send(packet);
@@ -56,51 +53,56 @@ impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
let mut retry = false;
let (new_state, result) = match old_state {
- StartTlsClientState::SendStartTls(mut send) =>
- match send.poll() {
- Ok(Async::Ready(xmpp_stream)) => {
- let new_state = StartTlsClientState::AwaitProceed(xmpp_stream);
- retry = true;
- (new_state, Ok(Async::NotReady))
- },
- Ok(Async::NotReady) =>
- (StartTlsClientState::SendStartTls(send), Ok(Async::NotReady)),
- Err(e) =>
- (StartTlsClientState::SendStartTls(send), Err(e.into())),
- },
- StartTlsClientState::AwaitProceed(mut xmpp_stream) =>
- match xmpp_stream.poll() {
- Ok(Async::Ready(Some(Packet::Stanza(ref stanza))))
- if stanza.name() == "proceed" =>
- {
- let stream = xmpp_stream.stream.into_inner();
- let connect = TlsConnector::from(NativeTlsConnector::builder()
- .build().unwrap())
+ StartTlsClientState::SendStartTls(mut send) => match send.poll() {
+ Ok(Async::Ready(xmpp_stream)) => {
+ let new_state = StartTlsClientState::AwaitProceed(xmpp_stream);
+ retry = true;
+ (new_state, Ok(Async::NotReady))
+ }
+ Ok(Async::NotReady) => {
+ (StartTlsClientState::SendStartTls(send), Ok(Async::NotReady))
+ }
+ Err(e) => (StartTlsClientState::SendStartTls(send), Err(e.into())),
+ },
+ StartTlsClientState::AwaitProceed(mut xmpp_stream) => match xmpp_stream.poll() {
+ Ok(Async::Ready(Some(Packet::Stanza(ref stanza))))
+ if stanza.name() == "proceed" =>
+ {
+ let stream = xmpp_stream.stream.into_inner();
+ let connect =
+ TlsConnector::from(NativeTlsConnector::builder().build().unwrap())
.connect(&self.jid.domain, stream);
- let new_state = StartTlsClientState::StartingTls(connect);
- retry = true;
- (new_state, Ok(Async::NotReady))
- },
- Ok(Async::Ready(value)) => {
- println!("StartTlsClient ignore {:?}", value);
- (StartTlsClientState::AwaitProceed(xmpp_stream), Ok(Async::NotReady))
- },
- Ok(_) =>
- (StartTlsClientState::AwaitProceed(xmpp_stream), Ok(Async::NotReady)),
- Err(e) =>
- (StartTlsClientState::AwaitProceed(xmpp_stream), Err(Error::Protocol(e.into()))),
- },
- StartTlsClientState::StartingTls(mut connect) =>
- match connect.poll() {
- Ok(Async::Ready(tls_stream)) =>
- (StartTlsClientState::Invalid, Ok(Async::Ready(tls_stream))),
- Ok(Async::NotReady) =>
- (StartTlsClientState::StartingTls(connect), Ok(Async::NotReady)),
- Err(e) =>
- (StartTlsClientState::Invalid, Err(e.into())),
- },
- StartTlsClientState::Invalid =>
- unreachable!(),
+ let new_state = StartTlsClientState::StartingTls(connect);
+ retry = true;
+ (new_state, Ok(Async::NotReady))
+ }
+ Ok(Async::Ready(value)) => {
+ println!("StartTlsClient ignore {:?}", value);
+ (
+ StartTlsClientState::AwaitProceed(xmpp_stream),
+ Ok(Async::NotReady),
+ )
+ }
+ Ok(_) => (
+ StartTlsClientState::AwaitProceed(xmpp_stream),
+ Ok(Async::NotReady),
+ ),
+ Err(e) => (
+ StartTlsClientState::AwaitProceed(xmpp_stream),
+ Err(Error::Protocol(e.into())),
+ ),
+ },
+ StartTlsClientState::StartingTls(mut connect) => match connect.poll() {
+ Ok(Async::Ready(tls_stream)) => {
+ (StartTlsClientState::Invalid, Ok(Async::Ready(tls_stream)))
+ }
+ Ok(Async::NotReady) => (
+ StartTlsClientState::StartingTls(connect),
+ Ok(Async::NotReady),
+ ),
+ Err(e) => (StartTlsClientState::Invalid, Err(e.into())),
+ },
+ StartTlsClientState::Invalid => unreachable!(),
};
self.state = new_state;
@@ -1,11 +1,11 @@
-use std::mem::replace;
-use futures::{Future, Async, Poll, Stream, sink, Sink};
-use tokio_io::{AsyncRead, AsyncWrite};
-use tokio_codec::Framed;
+use futures::{sink, Async, Future, Poll, Sink, Stream};
use jid::Jid;
use minidom::Element;
+use std::mem::replace;
+use tokio_codec::Framed;
+use tokio_io::{AsyncRead, AsyncWrite};
-use crate::xmpp_codec::{XMPPCodec, Packet};
+use crate::xmpp_codec::{Packet, XMPPCodec};
use crate::xmpp_stream::XMPPStream;
use crate::{Error, ProtocolError};
@@ -26,11 +26,15 @@ enum StreamStartState<S: AsyncWrite> {
impl<S: AsyncWrite> StreamStart<S> {
pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid, ns: String) -> Self {
- let attrs = [("to".to_owned(), jid.domain.clone()),
- ("version".to_owned(), "1.0".to_owned()),
- ("xmlns".to_owned(), ns.clone()),
- ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
- ].iter().cloned().collect();
+ let attrs = [
+ ("to".to_owned(), jid.domain.clone()),
+ ("version".to_owned(), "1.0".to_owned()),
+ ("xmlns".to_owned(), ns.clone()),
+ ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
+ ]
+ .iter()
+ .cloned()
+ .collect();
let send = stream.send(Packet::StreamStart(attrs));
StreamStart {
@@ -50,59 +54,66 @@ impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
let mut retry = false;
let (new_state, result) = match old_state {
- StreamStartState::SendStart(mut send) =>
- match send.poll() {
- Ok(Async::Ready(stream)) => {
+ StreamStartState::SendStart(mut send) => match send.poll() {
+ Ok(Async::Ready(stream)) => {
+ retry = true;
+ (StreamStartState::RecvStart(stream), Ok(Async::NotReady))
+ }
+ Ok(Async::NotReady) => (StreamStartState::SendStart(send), Ok(Async::NotReady)),
+ Err(e) => (StreamStartState::Invalid, Err(e.into())),
+ },
+ StreamStartState::RecvStart(mut stream) => match stream.poll() {
+ Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
+ let stream_ns = stream_attrs
+ .get("xmlns")
+ .ok_or(ProtocolError::NoStreamNamespace)?
+ .clone();
+ if self.ns == "jabber:client" {
retry = true;
- (StreamStartState::RecvStart(stream), Ok(Async::NotReady))
- },
- Ok(Async::NotReady) =>
- (StreamStartState::SendStart(send), Ok(Async::NotReady)),
- Err(e) =>
- (StreamStartState::Invalid, Err(e.into())),
- },
- StreamStartState::RecvStart(mut stream) =>
- match stream.poll() {
- Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
- let stream_ns = stream_attrs.get("xmlns")
- .ok_or(ProtocolError::NoStreamNamespace)?
+ // TODO: skip RecvFeatures for version < 1.0
+ (
+ StreamStartState::RecvFeatures(stream, stream_ns),
+ Ok(Async::NotReady),
+ )
+ } else {
+ let id = stream_attrs
+ .get("id")
+ .ok_or(ProtocolError::NoStreamId)?
.clone();
- if self.ns == "jabber:client" {
- retry = true;
- // TODO: skip RecvFeatures for version < 1.0
- (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
- } else {
- let id = stream_attrs.get("id")
- .ok_or(ProtocolError::NoStreamId)?
- .clone();
- // FIXME: huge hack, shouldn’t be an element!
- let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), Element::builder(id).build());
- (StreamStartState::Invalid, Ok(Async::Ready(stream)))
- }
- },
- Ok(Async::Ready(_)) =>
- return Err(ProtocolError::InvalidToken.into()),
- Ok(Async::NotReady) =>
- (StreamStartState::RecvStart(stream), Ok(Async::NotReady)),
- Err(e) =>
- return Err(ProtocolError::from(e).into()),
- },
- StreamStartState::RecvFeatures(mut stream, stream_ns) =>
- match stream.poll() {
- Ok(Async::Ready(Some(Packet::Stanza(stanza)))) =>
- if stanza.is("features", NS_XMPP_STREAM) {
- let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza);
- (StreamStartState::Invalid, Ok(Async::Ready(stream)))
- } else {
- (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
- },
- Ok(Async::Ready(_)) | Ok(Async::NotReady) =>
- (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)),
- Err(e) =>
- return Err(ProtocolError::from(e).into()),
- },
- StreamStartState::Invalid =>
- unreachable!(),
+ // FIXME: huge hack, shouldn’t be an element!
+ let stream = XMPPStream::new(
+ self.jid.clone(),
+ stream,
+ self.ns.clone(),
+ Element::builder(id).build(),
+ );
+ (StreamStartState::Invalid, Ok(Async::Ready(stream)))
+ }
+ }
+ Ok(Async::Ready(_)) => return Err(ProtocolError::InvalidToken.into()),
+ Ok(Async::NotReady) => (StreamStartState::RecvStart(stream), Ok(Async::NotReady)),
+ Err(e) => return Err(ProtocolError::from(e).into()),
+ },
+ StreamStartState::RecvFeatures(mut stream, stream_ns) => match stream.poll() {
+ Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
+ if stanza.is("features", NS_XMPP_STREAM) {
+ let stream =
+ XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza);
+ (StreamStartState::Invalid, Ok(Async::Ready(stream)))
+ } else {
+ (
+ StreamStartState::RecvFeatures(stream, stream_ns),
+ Ok(Async::NotReady),
+ )
+ }
+ }
+ Ok(Async::Ready(_)) | Ok(Async::NotReady) => (
+ StreamStartState::RecvFeatures(stream, stream_ns),
+ Ok(Async::NotReady),
+ ),
+ Err(e) => return Err(ProtocolError::from(e).into()),
+ },
+ StreamStartState::Invalid => unreachable!(),
};
self.state = new_state;
@@ -1,22 +1,22 @@
//! XML stream parser for XMPP
+use crate::{ParseError, ParserError};
+use bytes::{BufMut, BytesMut};
+use minidom::Element;
+use quick_xml::Writer as EventWriter;
use std;
+use std::cell::RefCell;
+use std::collections::vec_deque::VecDeque;
+use std::collections::HashMap;
use std::default::Default;
+use std::fmt::Write;
+use std::io;
use std::iter::FromIterator;
-use std::cell::RefCell;
use std::rc::Rc;
-use std::fmt::Write;
use std::str::from_utf8;
-use std::io;
-use std::collections::HashMap;
-use std::collections::vec_deque::VecDeque;
-use tokio_codec::{Encoder, Decoder};
-use minidom::Element;
-use xml5ever::tokenizer::{XmlTokenizer, TokenSink, Token, Tag, TagKind};
+use tokio_codec::{Decoder, Encoder};
use xml5ever::interface::Attribute;
-use bytes::{BytesMut, BufMut};
-use quick_xml::Writer as EventWriter;
-use crate::{ParserError, ParseError};
+use xml5ever::tokenizer::{Tag, TagKind, Token, TokenSink, XmlTokenizer};
/// Anything that can be sent or received on an XMPP/XML stream
#[derive(Debug)]
@@ -72,17 +72,21 @@ impl ParserSink {
fn handle_start_tag(&mut self, tag: Tag) {
let mut nss = HashMap::new();
- let is_prefix_xmlns = |attr: &Attribute| attr.name.prefix.as_ref()
- .map(|prefix| prefix.eq_str_ignore_ascii_case("xmlns"))
- .unwrap_or(false);
+ let is_prefix_xmlns = |attr: &Attribute| {
+ attr.name
+ .prefix
+ .as_ref()
+ .map(|prefix| prefix.eq_str_ignore_ascii_case("xmlns"))
+ .unwrap_or(false)
+ };
for attr in &tag.attrs {
match attr.name.local.as_ref() {
"xmlns" => {
nss.insert(None, attr.value.as_ref().to_owned());
- },
+ }
prefix if is_prefix_xmlns(attr) => {
- nss.insert(Some(prefix.to_owned()), attr.value.as_ref().to_owned());
- },
+ nss.insert(Some(prefix.to_owned()), attr.value.as_ref().to_owned());
+ }
_ => (),
}
}
@@ -90,10 +94,9 @@ impl ParserSink {
let el = {
let mut el_builder = Element::builder(tag.name.local.as_ref());
- if let Some(el_ns) = self.lookup_ns(
- &tag.name.prefix.map(|prefix|
- prefix.as_ref().to_owned())
- ) {
+ if let Some(el_ns) =
+ self.lookup_ns(&tag.name.prefix.map(|prefix| prefix.as_ref().to_owned()))
+ {
el_builder = el_builder.ns(el_ns);
}
for attr in &tag.attrs {
@@ -101,21 +104,20 @@ impl ParserSink {
"xmlns" => (),
_ if is_prefix_xmlns(attr) => (),
_ => {
- el_builder = el_builder.attr(
- attr.name.local.as_ref(),
- attr.value.as_ref()
- );
- },
+ el_builder = el_builder.attr(attr.name.local.as_ref(), attr.value.as_ref());
+ }
}
}
el_builder.build()
};
if self.stack.is_empty() {
- let attrs = HashMap::from_iter(
- tag.attrs.iter()
- .map(|attr| (attr.name.local.as_ref().to_owned(), attr.value.as_ref().to_owned()))
- );
+ let attrs = HashMap::from_iter(tag.attrs.iter().map(|attr| {
+ (
+ attr.name.local.as_ref().to_owned(),
+ attr.value.as_ref().to_owned(),
+ )
+ }));
self.push_queue(Packet::StreamStart(attrs));
}
@@ -128,15 +130,13 @@ impl ParserSink {
match self.stack.len() {
// </stream:stream>
- 0 =>
- self.push_queue(Packet::StreamEnd),
+ 0 => self.push_queue(Packet::StreamEnd),
// </stanza>
- 1 =>
- self.push_queue(Packet::Stanza(el)),
+ 1 => self.push_queue(Packet::Stanza(el)),
len => {
let parent = &mut self.stack[len - 1];
parent.append_child(el);
- },
+ }
}
}
}
@@ -145,32 +145,26 @@ impl TokenSink for ParserSink {
fn process_token(&mut self, token: Token) {
match token {
Token::TagToken(tag) => match tag.kind {
- TagKind::StartTag =>
- self.handle_start_tag(tag),
- TagKind::EndTag =>
- self.handle_end_tag(),
+ TagKind::StartTag => self.handle_start_tag(tag),
+ TagKind::EndTag => self.handle_end_tag(),
TagKind::EmptyTag => {
self.handle_start_tag(tag);
self.handle_end_tag();
- },
- TagKind::ShortTag =>
- self.push_queue_error(ParserError::ShortTag),
+ }
+ TagKind::ShortTag => self.push_queue_error(ParserError::ShortTag),
},
- Token::CharacterTokens(tendril) =>
- match self.stack.len() {
- 0 | 1 =>
- self.push_queue(Packet::Text(tendril.into())),
- len => {
- let el = &mut self.stack[len - 1];
- el.append_text_node(tendril);
- },
- },
- Token::EOFToken =>
- self.push_queue(Packet::StreamEnd),
+ Token::CharacterTokens(tendril) => match self.stack.len() {
+ 0 | 1 => self.push_queue(Packet::Text(tendril.into())),
+ len => {
+ let el = &mut self.stack[len - 1];
+ el.append_text_node(tendril);
+ }
+ },
+ Token::EOFToken => self.push_queue(Packet::StreamEnd),
Token::ParseError(s) => {
// println!("ParseError: {:?}", s);
self.push_queue_error(ParserError::Parse(ParseError(s)));
- },
+ }
_ => (),
}
}
@@ -219,23 +213,22 @@ impl Decoder for XMPPCodec {
type Error = ParserError;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
- let buf1: Box<AsRef<[u8]>> =
- if ! self.buf.is_empty() && ! buf.is_empty() {
- let mut prefix = std::mem::replace(&mut self.buf, vec![]);
- prefix.extend_from_slice(buf.take().as_ref());
- Box::new(prefix)
- } else {
- Box::new(buf.take())
- };
+ let buf1: Box<AsRef<[u8]>> = if !self.buf.is_empty() && !buf.is_empty() {
+ let mut prefix = std::mem::replace(&mut self.buf, vec![]);
+ prefix.extend_from_slice(buf.take().as_ref());
+ Box::new(prefix)
+ } else {
+ Box::new(buf.take())
+ };
let buf1 = buf1.as_ref().as_ref();
match from_utf8(buf1) {
Ok(s) => {
- if ! s.is_empty() {
+ if !s.is_empty() {
// println!("<< {}", s);
let tendril = FromIterator::from_iter(s.chars());
self.parser.feed(tendril);
}
- },
+ }
// Remedies for truncated utf8
Err(e) if e.valid_up_to() >= buf1.len() - 3 => {
// Prepare all the valid data
@@ -249,17 +242,16 @@ impl Decoder for XMPPCodec {
self.buf.extend_from_slice(&buf1[e.valid_up_to()..]);
return result;
- },
+ }
Err(e) => {
// println!("error {} at {}/{} in {:?}", e, e.valid_up_to(), buf1.len(), buf1);
return Err(ParserError::Utf8(e));
- },
+ }
}
match self.queue.borrow_mut().pop_front() {
None => Ok(None),
- Some(result) =>
- result.map(|pkt| Some(pkt)),
+ Some(result) => result.map(|pkt| Some(pkt)),
}
}
@@ -284,8 +276,7 @@ impl Encoder for XMPPCodec {
let mut buf = String::new();
write!(buf, "<stream:stream").unwrap();
for (name, value) in start_attrs {
- write!(buf, " {}=\"{}\"", escape(&name), escape(&value))
- .unwrap();
+ write!(buf, " {}=\"{}\"", escape(&name), escape(&value)).unwrap();
if name == "xmlns" {
self.ns = Some(value);
}
@@ -293,17 +284,17 @@ impl Encoder for XMPPCodec {
write!(buf, ">\n").unwrap();
print!(">> {}", buf);
- write!(dst, "{}", buf)
- .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
- },
+ write!(dst, "{}", buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
+ }
Packet::Stanza(stanza) => {
- stanza.write_to_inner(&mut EventWriter::new(WriteBytes::new(dst)))
+ stanza
+ .write_to_inner(&mut EventWriter::new(WriteBytes::new(dst)))
.and_then(|_| {
// println!(">> {:?}", dst);
Ok(())
})
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{}", e)))
- },
+ }
Packet::Text(text) => {
write_text(&text, dst)
.and_then(|_| {
@@ -311,9 +302,9 @@ impl Encoder for XMPPCodec {
Ok(())
})
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{}", e)))
- },
+ }
// TODO: Implement all
- _ => Ok(())
+ _ => Ok(()),
}
}
}
@@ -334,7 +325,7 @@ pub fn escape(input: &str) -> String {
'>' => result.push_str(">"),
'\'' => result.push_str("'"),
'"' => result.push_str("""),
- o => result.push(o)
+ o => result.push(o),
}
}
result
@@ -364,7 +355,6 @@ impl<'a> std::io::Write for WriteBytes<'a> {
}
}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -405,10 +395,7 @@ mod tests {
b.put(r">");
let r = c.decode(&mut b);
assert!(match r {
- Ok(Some(Packet::Stanza(ref el)))
- if el.name() == "test"
- && el.text() == "ß"
- => true,
+ Ok(Some(Packet::Stanza(ref el))) if el.name() == "test" && el.text() == "ß" => true,
_ => false,
});
}
@@ -436,10 +423,7 @@ mod tests {
b.put(&b"\x9f</test>"[..]);
let r = c.decode(&mut b);
assert!(match r {
- Ok(Some(Packet::Stanza(ref el)))
- if el.name() == "test"
- && el.text() == "ß"
- => true,
+ Ok(Some(Packet::Stanza(ref el))) if el.name() == "test" && el.text() == "ß" => true,
_ => false,
});
}
@@ -447,8 +431,8 @@ mod tests {
/// By default, encode() only get's a BytesMut that has 8kb space reserved.
#[test]
fn test_large_stanza() {
- use std::io::Cursor;
use futures::{Future, Sink};
+ use std::io::Cursor;
use tokio_codec::FramedWrite;
let framed = FramedWrite::new(Cursor::new(vec![]), XMPPCodec::new());
let mut text = "".to_owned();
@@ -456,15 +440,12 @@ mod tests {
text = text + "A";
}
let stanza = Element::builder("message")
- .append(
- Element::builder("body")
- .append(&text)
- .build()
- )
+ .append(Element::builder("body").append(&text).build())
.build();
- let framed = framed.send(Packet::Stanza(stanza))
- .wait()
- .expect("send");
- assert_eq!(framed.get_ref().get_ref(), &("<message><body>".to_owned() + &text + "</body></message>").as_bytes());
+ let framed = framed.send(Packet::Stanza(stanza)).wait().expect("send");
+ assert_eq!(
+ framed.get_ref().get_ref(),
+ &("<message><body>".to_owned() + &text + "</body></message>").as_bytes()
+ );
}
}
@@ -1,14 +1,14 @@
//! `XMPPStream` is the common container for all XMPP network connections
-use futures::{Poll, Stream, Sink, StartSend};
use futures::sink::Send;
-use tokio_io::{AsyncRead, AsyncWrite};
-use tokio_codec::Framed;
-use minidom::Element;
+use futures::{Poll, Sink, StartSend, Stream};
use jid::Jid;
+use minidom::Element;
+use tokio_codec::Framed;
+use tokio_io::{AsyncRead, AsyncWrite};
-use crate::xmpp_codec::{XMPPCodec, Packet};
use crate::stream_start::StreamStart;
+use crate::xmpp_codec::{Packet, XMPPCodec};
/// <stream:stream> namespace
pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
@@ -30,11 +30,18 @@ pub struct XMPPStream<S> {
impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
/// Constructor
- pub fn new(jid: Jid,
- stream: Framed<S, XMPPCodec>,
- ns: String,
- stream_features: Element) -> Self {
- XMPPStream { jid, stream, stream_features, ns }
+ pub fn new(
+ jid: Jid,
+ stream: Framed<S, XMPPCodec>,
+ ns: String,
+ stream_features: Element,
+ ) -> Self {
+ XMPPStream {
+ jid,
+ stream,
+ stream_features,
+ ns,
+ }
}
/// Send a `<stream:stream>` start tag