diff --git a/examples/echo_bot.rs b/examples/echo_bot.rs index 67cbfbb35f98097ac7f7750a4cce612fb27b0967..80ffc97fcc142e4e035d3488abb05379a1c53008 100644 --- a/examples/echo_bot.rs +++ b/examples/echo_bot.rs @@ -1,13 +1,13 @@ +use futures::{future, Sink, Stream}; +use jid::Jid; +use minidom::Element; use std::env::args; use std::process::exit; -use try_from::TryFrom; -use futures::{Stream, Sink, future}; use tokio::runtime::current_thread::Runtime; use tokio_xmpp::Client; -use minidom::Element; -use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; -use xmpp_parsers::message::{Message, MessageType, Body}; -use jid::Jid; +use try_from::TryFrom; +use xmpp_parsers::message::{Body, Message, MessageType}; +use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType}; fn main() { let args: Vec = args().collect(); @@ -38,16 +38,18 @@ fn main() { let presence = make_presence(); send(presence); - } else if let Some(message) = event.into_stanza() + } else if let Some(message) = event + .into_stanza() .and_then(|stanza| Message::try_from(stanza).ok()) { // This is a message we'll echo match (message.from, message.bodies.get("")) { - (Some(from), Some(body)) => + (Some(from), Some(body)) => { if message.type_ != MessageType::Error { let reply = make_reply(from, &body.0); send(reply); - }, + } + } _ => (), } } @@ -69,7 +71,9 @@ fn main() { fn make_presence() -> Element { let mut presence = Presence::new(PresenceType::None); presence.show = PresenceShow::Chat; - presence.statuses.insert(String::from("en"), String::from("Echoing messages.")); + presence + .statuses + .insert(String::from("en"), String::from("Echoing messages.")); presence.into() } diff --git a/examples/echo_component.rs b/examples/echo_component.rs index b9e24265dafb60473b32af6a00ae35f84ff8cf33..348b06a4d2d812303943b996e514a966b945f610 100644 --- a/examples/echo_component.rs +++ b/examples/echo_component.rs @@ -1,14 +1,14 @@ +use futures::{future, Sink, Stream}; +use jid::Jid; +use minidom::Element; use std::env::args; use std::process::exit; use std::str::FromStr; -use try_from::TryFrom; use tokio::runtime::current_thread::Runtime; -use futures::{Stream, Sink, future}; use tokio_xmpp::Component; -use minidom::Element; -use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; -use xmpp_parsers::message::{Message, MessageType, Body}; -use jid::Jid; +use try_from::TryFrom; +use xmpp_parsers::message::{Body, Message, MessageType}; +use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType}; fn main() { let args: Vec = args().collect(); @@ -18,7 +18,11 @@ fn main() { } let jid = &args[1]; let password = &args[2]; - let server = &args.get(3).unwrap().parse().unwrap_or("127.0.0.1".to_owned()); + let server = &args + .get(3) + .unwrap() + .parse() + .unwrap_or("127.0.0.1".to_owned()); let port: u16 = args.get(4).unwrap().parse().unwrap_or(5347u16); // tokio_core context @@ -42,18 +46,23 @@ fn main() { println!("Online!"); // TODO: replace these hardcoded JIDs - let presence = make_presence(Jid::from_str("test@component.linkmauve.fr/coucou").unwrap(), Jid::from_str("linkmauve@linkmauve.fr").unwrap()); + let presence = make_presence( + Jid::from_str("test@component.linkmauve.fr/coucou").unwrap(), + Jid::from_str("linkmauve@linkmauve.fr").unwrap(), + ); send(presence); - } else if let Some(message) = event.into_stanza() + } else if let Some(message) = event + .into_stanza() .and_then(|stanza| Message::try_from(stanza).ok()) { // This is a message we'll echo match (message.from, message.bodies.get("")) { - (Some(from), Some(body)) => + (Some(from), Some(body)) => { if message.type_ != MessageType::Error { let reply = make_reply(from, &body.0); send(reply); - }, + } + } _ => (), } } @@ -77,7 +86,9 @@ fn make_presence(from: Jid, to: Jid) -> Element { presence.from = Some(from); presence.to = Some(to); presence.show = PresenceShow::Chat; - presence.statuses.insert(String::from("en"), String::from("Echoing messages.")); + presence + .statuses + .insert(String::from("en"), String::from("Echoing messages.")); presence.into() } diff --git a/src/client/auth.rs b/src/client/auth.rs index 2efb7071ae1a7ef958dc64c7a51e490a824b73e7..eadceb02a63c873d607cd6b5efc25cecb3dbba6d 100644 --- a/src/client/auth.rs +++ b/src/client/auth.rs @@ -1,23 +1,22 @@ +use futures::{sink, Async, Future, Poll, Stream}; +use minidom::Element; +use sasl::client::mechanisms::{Anonymous, Plain, Scram}; +use sasl::client::Mechanism; +use sasl::common::scram::{Sha1, Sha256}; +use sasl::common::Credentials; use std::mem::replace; use std::str::FromStr; -use futures::{Future, Poll, Async, sink, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; -use sasl::common::Credentials; -use sasl::common::scram::{Sha1, Sha256}; -use sasl::client::Mechanism; -use sasl::client::mechanisms::{Scram, Plain, Anonymous}; -use minidom::Element; -use xmpp_parsers::sasl::{Auth, Challenge, Response, Success, Failure, Mechanism as XMPPMechanism}; use try_from::TryFrom; +use xmpp_parsers::sasl::{Auth, Challenge, Failure, Mechanism as XMPPMechanism, Response, Success}; +use crate::stream_start::StreamStart; use crate::xmpp_codec::Packet; use crate::xmpp_stream::XMPPStream; -use crate::stream_start::StreamStart; -use crate::{Error, AuthError, ProtocolError}; +use crate::{AuthError, Error, ProtocolError}; const NS_XMPP_SASL: &str = "urn:ietf:params:xml:ns:xmpp-sasl"; - pub struct ClientAuth { state: ClientAuthState, mechanism: Box, @@ -39,8 +38,9 @@ impl ClientAuth { Box::new(Anonymous::new()), ]; - let mech_names: Vec = - stream.stream_features.get_child("mechanisms", NS_XMPP_SASL) + let mech_names: Vec = stream + .stream_features + .get_child("mechanisms", NS_XMPP_SASL) .ok_or(AuthError::NoMechanism)? .children() .filter(|child| child.is("mechanism", NS_XMPP_SASL)) @@ -52,20 +52,18 @@ impl ClientAuth { let name = mech.name().to_owned(); if mech_names.iter().any(|name1| *name1 == name) { // println!("SASL mechanism selected: {:?}", name); - let initial = mech.initial() - .map_err(AuthError::Sasl)?; + let initial = mech.initial().map_err(AuthError::Sasl)?; let mut this = ClientAuth { state: ClientAuthState::Invalid, mechanism: mech, }; - let mechanism = XMPPMechanism::from_str(&name) - .map_err(ProtocolError::Parsers)?; + let mechanism = XMPPMechanism::from_str(&name).map_err(ProtocolError::Parsers)?; this.send( stream, Auth { mechanism, data: initial, - } + }, ); return Ok(this); } @@ -89,61 +87,55 @@ impl Future for ClientAuth { let state = replace(&mut self.state, ClientAuthState::Invalid); match state { - ClientAuthState::WaitSend(mut send) => - match send.poll() { - Ok(Async::Ready(stream)) => { - self.state = ClientAuthState::WaitRecv(stream); + ClientAuthState::WaitSend(mut send) => match send.poll() { + Ok(Async::Ready(stream)) => { + self.state = ClientAuthState::WaitRecv(stream); + self.poll() + } + Ok(Async::NotReady) => { + self.state = ClientAuthState::WaitSend(send); + Ok(Async::NotReady) + } + Err(e) => Err(e)?, + }, + ClientAuthState::WaitRecv(mut stream) => match stream.poll() { + Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { + if let Ok(challenge) = Challenge::try_from(stanza.clone()) { + let response = self + .mechanism + .response(&challenge.data) + .map_err(AuthError::Sasl)?; + self.send(stream, Response { data: response }); self.poll() - }, - Ok(Async::NotReady) => { - self.state = ClientAuthState::WaitSend(send); - Ok(Async::NotReady) - }, - Err(e) => - Err(e)?, - }, - ClientAuthState::WaitRecv(mut stream) => - match stream.poll() { - Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { - if let Ok(challenge) = Challenge::try_from(stanza.clone()) { - let response = self.mechanism.response(&challenge.data) - .map_err(AuthError::Sasl)?; - self.send(stream, Response { data: response }); - self.poll() - } else if let Ok(_) = Success::try_from(stanza.clone()) { - let start = stream.restart(); - self.state = ClientAuthState::Start(start); - self.poll() - } else if let Ok(failure) = Failure::try_from(stanza) { - Err(AuthError::Fail(failure.defined_condition))? - } else { - Ok(Async::NotReady) - } - } - Ok(Async::Ready(_event)) => { - // println!("ClientAuth ignore {:?}", _event); - Ok(Async::NotReady) - }, - Ok(_) => { - self.state = ClientAuthState::WaitRecv(stream); - Ok(Async::NotReady) - }, - Err(e) => - Err(ProtocolError::Parser(e))? - }, - ClientAuthState::Start(mut start) => - match start.poll() { - Ok(Async::Ready(stream)) => - Ok(Async::Ready(stream)), - Ok(Async::NotReady) => { + } else if let Ok(_) = Success::try_from(stanza.clone()) { + let start = stream.restart(); self.state = ClientAuthState::Start(start); + self.poll() + } else if let Ok(failure) = Failure::try_from(stanza) { + Err(AuthError::Fail(failure.defined_condition))? + } else { Ok(Async::NotReady) - }, - Err(e) => - Err(e) - }, - ClientAuthState::Invalid => - unreachable!(), + } + } + Ok(Async::Ready(_event)) => { + // println!("ClientAuth ignore {:?}", _event); + Ok(Async::NotReady) + } + Ok(_) => { + self.state = ClientAuthState::WaitRecv(stream); + Ok(Async::NotReady) + } + Err(e) => Err(ProtocolError::Parser(e))?, + }, + ClientAuthState::Start(mut start) => match start.poll() { + Ok(Async::Ready(stream)) => Ok(Async::Ready(stream)), + Ok(Async::NotReady) => { + self.state = ClientAuthState::Start(start); + Ok(Async::NotReady) + } + Err(e) => Err(e), + }, + ClientAuthState::Invalid => unreachable!(), } } } diff --git a/src/client/bind.rs b/src/client/bind.rs index bfcca2bee78201a5cee33d8afd39ecdf7058bcf5..3758a563708083650694d4419f14bd186ba3d15d 100644 --- a/src/client/bind.rs +++ b/src/client/bind.rs @@ -1,9 +1,9 @@ +use futures::{sink, Async, Future, Poll, Stream}; use std::mem::replace; -use futures::{Future, Poll, Async, sink, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; -use xmpp_parsers::iq::{Iq, IqType}; -use xmpp_parsers::bind::Bind; use try_from::TryFrom; +use xmpp_parsers::bind::Bind; +use xmpp_parsers::iq::{Iq, IqType}; use crate::xmpp_codec::Packet; use crate::xmpp_stream::XMPPStream; @@ -26,16 +26,17 @@ impl ClientBind { pub fn new(stream: XMPPStream) -> Self { match stream.stream_features.get_child("bind", NS_XMPP_BIND) { None => - // No resource binding available, - // return the (probably // usable) stream immediately - ClientBind::Unsupported(stream), + // No resource binding available, + // return the (probably // usable) stream immediately + { + ClientBind::Unsupported(stream) + } Some(_) => { let resource = stream.jid.resource.clone(); - let iq = Iq::from_set(Bind::new(resource)) - .with_id(BIND_REQ_ID.to_string()); + let iq = Iq::from_set(Bind::new(resource)).with_id(BIND_REQ_ID.to_string()); let send = stream.send_stanza(iq); ClientBind::WaitSend(send) - }, + } } } } @@ -48,59 +49,51 @@ impl Future for ClientBind { let state = replace(self, ClientBind::Invalid); match state { - ClientBind::Unsupported(stream) => - Ok(Async::Ready(stream)), - ClientBind::WaitSend(mut send) => { - match send.poll() { - Ok(Async::Ready(stream)) => { - replace(self, ClientBind::WaitRecv(stream)); - self.poll() - }, - Ok(Async::NotReady) => { - replace(self, ClientBind::WaitSend(send)); - Ok(Async::NotReady) - }, - Err(e) => - Err(e)? + ClientBind::Unsupported(stream) => Ok(Async::Ready(stream)), + ClientBind::WaitSend(mut send) => match send.poll() { + Ok(Async::Ready(stream)) => { + replace(self, ClientBind::WaitRecv(stream)); + self.poll() + } + Ok(Async::NotReady) => { + replace(self, ClientBind::WaitSend(send)); + Ok(Async::NotReady) } + Err(e) => Err(e)?, }, - ClientBind::WaitRecv(mut stream) => { - match stream.poll() { - Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => - match Iq::try_from(stanza) { - Ok(iq) => if iq.id == Some(BIND_REQ_ID.to_string()) { - match iq.payload { - IqType::Result(payload) => { - payload - .and_then(|payload| Bind::try_from(payload).ok()) - .map(|bind| match bind { - Bind::Jid(jid) => stream.jid = jid, - _ => {} - }); - Ok(Async::Ready(stream)) - }, - _ => - Err(ProtocolError::InvalidBindResponse)?, + ClientBind::WaitRecv(mut stream) => match stream.poll() { + Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => match Iq::try_from(stanza) { + Ok(iq) => { + if iq.id == Some(BIND_REQ_ID.to_string()) { + match iq.payload { + IqType::Result(payload) => { + payload + .and_then(|payload| Bind::try_from(payload).ok()) + .map(|bind| match bind { + Bind::Jid(jid) => stream.jid = jid, + _ => {} + }); + Ok(Async::Ready(stream)) } - } else { - Ok(Async::NotReady) - }, - _ => Ok(Async::NotReady), - }, - Ok(Async::Ready(_)) => { - replace(self, ClientBind::WaitRecv(stream)); - self.poll() - }, - Ok(Async::NotReady) => { - replace(self, ClientBind::WaitRecv(stream)); - Ok(Async::NotReady) - }, - Err(e) => - Err(e)?, + _ => Err(ProtocolError::InvalidBindResponse)?, + } + } else { + Ok(Async::NotReady) + } + } + _ => Ok(Async::NotReady), + }, + Ok(Async::Ready(_)) => { + replace(self, ClientBind::WaitRecv(stream)); + self.poll() + } + Ok(Async::NotReady) => { + replace(self, ClientBind::WaitRecv(stream)); + Ok(Async::NotReady) } + Err(e) => Err(e)?, }, - ClientBind::Invalid => - unreachable!(), + ClientBind::Invalid => unreachable!(), } } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 72634a2ee02c93077f967a7b00b30e908d8c11c4..9e62373dc4f276d059b9203592ede8886139bfd4 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,19 +1,19 @@ +use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; +use idna; +use jid::{Jid, JidParseError}; +use minidom::Element; +use sasl::common::{ChannelBinding, Credentials}; use std::mem::replace; use std::str::FromStr; use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tls::TlsStream; -use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done}; -use minidom::Element; -use jid::{Jid, JidParseError}; -use sasl::common::{Credentials, ChannelBinding}; -use idna; +use super::event::Event; +use super::happy_eyeballs::Connecter; +use super::starttls::{StartTlsClient, NS_XMPP_TLS}; use super::xmpp_codec::Packet; use super::xmpp_stream; -use super::starttls::{NS_XMPP_TLS, StartTlsClient}; -use super::happy_eyeballs::Connecter; -use super::event::Event; use super::{Error, ProtocolError}; mod auth; @@ -34,7 +34,7 @@ const NS_JABBER_CLIENT: &str = "jabber:client"; enum ClientState { Invalid, Disconnected, - Connecting(Box>), + Connecting(Box>), Connected(XMPPStream), } @@ -53,51 +53,62 @@ impl Client { }) } - fn make_connect(jid: Jid, password: String) -> impl Future { + fn make_connect(jid: Jid, password: String) -> impl Future { let username = jid.node.as_ref().unwrap().to_owned(); let jid1 = jid.clone(); let jid2 = jid.clone(); let password = password; done(idna::domain_to_ascii(&jid.domain)) .map_err(|_| Error::Idna) - .and_then(|domain| - done(Connecter::from_lookup(&domain, Some("_xmpp-client._tcp"), 5222)) - ) + .and_then(|domain| { + done(Connecter::from_lookup( + &domain, + Some("_xmpp-client._tcp"), + 5222, + )) + }) .flatten() - .and_then(move |tcp_stream| - xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned()) - ).and_then(|xmpp_stream| { + .and_then(move |tcp_stream| { + xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned()) + }) + .and_then(|xmpp_stream| { if Self::can_starttls(&xmpp_stream) { Ok(Self::starttls(xmpp_stream)) } else { Err(Error::Protocol(ProtocolError::NoTls)) } - }).flatten() - .and_then(|tls_stream| - XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()) - ).and_then(move |xmpp_stream| - done(Self::auth(xmpp_stream, username, password)) - // TODO: flatten? - ).and_then(|auth| auth) + }) + .flatten() + .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned())) + .and_then( + move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten? + ) + .and_then(|auth| auth) + .and_then(|xmpp_stream| Self::bind(xmpp_stream)) .and_then(|xmpp_stream| { - Self::bind(xmpp_stream) - }).and_then(|xmpp_stream| { // println!("Bound to {}", xmpp_stream.jid); Ok(xmpp_stream) }) } fn can_starttls(stream: &xmpp_stream::XMPPStream) -> bool { - stream.stream_features + stream + .stream_features .get_child("starttls", NS_XMPP_TLS) .is_some() } - fn starttls(stream: xmpp_stream::XMPPStream) -> StartTlsClient { + fn starttls( + stream: xmpp_stream::XMPPStream, + ) -> StartTlsClient { StartTlsClient::from_stream(stream) } - fn auth(stream: xmpp_stream::XMPPStream, username: String, password: String) -> Result, Error> { + fn auth( + stream: xmpp_stream::XMPPStream, + username: String, + password: String, + ) -> Result, Error> { let creds = Credentials::default() .with_username(username) .with_password(password) @@ -118,31 +129,25 @@ impl Stream for Client { let state = replace(&mut self.state, ClientState::Invalid); match state { - ClientState::Invalid => - Err(Error::InvalidState), - ClientState::Disconnected => - Ok(Async::Ready(None)), - ClientState::Connecting(mut connect) => { - match connect.poll() { - Ok(Async::Ready(stream)) => { - self.state = ClientState::Connected(stream); - Ok(Async::Ready(Some(Event::Online))) - }, - Ok(Async::NotReady) => { - self.state = ClientState::Connecting(connect); - Ok(Async::NotReady) - }, - Err(e) => - Err(e), + ClientState::Invalid => Err(Error::InvalidState), + ClientState::Disconnected => Ok(Async::Ready(None)), + ClientState::Connecting(mut connect) => match connect.poll() { + Ok(Async::Ready(stream)) => { + self.state = ClientState::Connected(stream); + Ok(Async::Ready(Some(Event::Online))) + } + Ok(Async::NotReady) => { + self.state = ClientState::Connecting(connect); + Ok(Async::NotReady) } + Err(e) => Err(e), }, ClientState::Connected(mut stream) => { // Poll sink match stream.poll_complete() { Ok(Async::NotReady) => (), Ok(Async::Ready(())) => (), - Err(e) => - return Err(e)?, + Err(e) => return Err(e)?, }; // Poll stream @@ -151,20 +156,18 @@ impl Stream for Client { // EOF self.state = ClientState::Disconnected; Ok(Async::Ready(Some(Event::Disconnected))) - }, + } Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { self.state = ClientState::Connected(stream); Ok(Async::Ready(Some(Event::Stanza(stanza)))) - }, - Ok(Async::NotReady) | - Ok(Async::Ready(_)) => { + } + Ok(Async::NotReady) | Ok(Async::Ready(_)) => { self.state = ClientState::Connected(stream); Ok(Async::NotReady) - }, - Err(e) => - Err(e)?, + } + Err(e) => Err(e)?, } - }, + } } } } @@ -175,30 +178,23 @@ impl Sink for Client { fn start_send(&mut self, item: Self::SinkItem) -> StartSend { match self.state { - ClientState::Connected(ref mut stream) => - match stream.start_send(Packet::Stanza(item)) { - Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => - Ok(AsyncSink::NotReady(stanza)), - Ok(AsyncSink::NotReady(_)) => - panic!("Client.start_send with stanza but got something else back"), - Ok(AsyncSink::Ready) => { - Ok(AsyncSink::Ready) - }, - Err(e) => - Err(e)?, - }, - _ => - Ok(AsyncSink::NotReady(item)), + ClientState::Connected(ref mut stream) => match stream.start_send(Packet::Stanza(item)) + { + Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)), + Ok(AsyncSink::NotReady(_)) => { + panic!("Client.start_send with stanza but got something else back") + } + Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), + Err(e) => Err(e)?, + }, + _ => Ok(AsyncSink::NotReady(item)), } } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { match self.state { - ClientState::Connected(ref mut stream) => - stream.poll_complete() - .map_err(|e| e.into()), - _ => - Ok(Async::Ready(())), + ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()), + _ => Ok(Async::Ready(())), } } } diff --git a/src/component/auth.rs b/src/component/auth.rs index ce55b3a9b2b9f365427721282872466beb5558ac..92ae0903cd8ab4aa93a0658cd8a7486f521878f8 100644 --- a/src/component/auth.rs +++ b/src/component/auth.rs @@ -1,11 +1,11 @@ +use futures::{sink, Async, Future, Poll, Stream}; use std::mem::replace; -use futures::{Future, Poll, Async, sink, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use xmpp_parsers::component::Handshake; use crate::xmpp_codec::Packet; use crate::xmpp_stream::XMPPStream; -use crate::{Error, AuthError}; +use crate::{AuthError, Error}; const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept"; @@ -29,7 +29,7 @@ impl ComponentAuth { }; this.send( stream, - Handshake::from_password_and_stream_id(&password, &sid) + Handshake::from_password_and_stream_id(&password, &sid), ); Ok(this) } @@ -50,45 +50,40 @@ impl Future for ComponentAuth { let state = replace(&mut self.state, ComponentAuthState::Invalid); match state { - ComponentAuthState::WaitSend(mut send) => - match send.poll() { - Ok(Async::Ready(stream)) => { - self.state = ComponentAuthState::WaitRecv(stream); - self.poll() - }, - Ok(Async::NotReady) => { - self.state = ComponentAuthState::WaitSend(send); - Ok(Async::NotReady) - }, - Err(e) => - Err(e)? - }, - ComponentAuthState::WaitRecv(mut stream) => - match stream.poll() { - Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) - if stanza.is("handshake", NS_JABBER_COMPONENT_ACCEPT) => - { - self.state = ComponentAuthState::Invalid; - Ok(Async::Ready(stream)) - }, - Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) - if stanza.is("error", "http://etherx.jabber.org/streams") => - { - Err(AuthError::ComponentFail.into()) - }, - Ok(Async::Ready(event)) => { - println!("ComponentAuth ignore {:?}", event); - Ok(Async::NotReady) - }, - Ok(_) => { - self.state = ComponentAuthState::WaitRecv(stream); - Ok(Async::NotReady) - }, - Err(e) => - Err(e)? - }, - ComponentAuthState::Invalid => - unreachable!(), + ComponentAuthState::WaitSend(mut send) => match send.poll() { + Ok(Async::Ready(stream)) => { + self.state = ComponentAuthState::WaitRecv(stream); + self.poll() + } + Ok(Async::NotReady) => { + self.state = ComponentAuthState::WaitSend(send); + Ok(Async::NotReady) + } + Err(e) => Err(e)?, + }, + ComponentAuthState::WaitRecv(mut stream) => match stream.poll() { + Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) + if stanza.is("handshake", NS_JABBER_COMPONENT_ACCEPT) => + { + self.state = ComponentAuthState::Invalid; + Ok(Async::Ready(stream)) + } + Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) + if stanza.is("error", "http://etherx.jabber.org/streams") => + { + Err(AuthError::ComponentFail.into()) + } + Ok(Async::Ready(event)) => { + println!("ComponentAuth ignore {:?}", event); + Ok(Async::NotReady) + } + Ok(_) => { + self.state = ComponentAuthState::WaitRecv(stream); + Ok(Async::NotReady) + } + Err(e) => Err(e)?, + }, + ComponentAuthState::Invalid => unreachable!(), } } } diff --git a/src/component/mod.rs b/src/component/mod.rs index ce55e7ecdba1c7548c62c2b1e3c15fe4e4197c9e..aac66ab4e5e64e40eac3a5233b9cf7cc4831bf0e 100644 --- a/src/component/mod.rs +++ b/src/component/mod.rs @@ -1,18 +1,18 @@ //! Components in XMPP are services/gateways that are logged into an //! XMPP server under a JID consisting of just a domain name. They are //! allowed to use any user and resource identifiers in their stanzas. +use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; +use jid::{Jid, JidParseError}; +use minidom::Element; use std::mem::replace; use std::str::FromStr; use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; -use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done}; -use minidom::Element; -use jid::{Jid, JidParseError}; +use super::event::Event; +use super::happy_eyeballs::Connecter; use super::xmpp_codec::Packet; use super::xmpp_stream; -use super::happy_eyeballs::Connecter; -use super::event::Event; use super::Error; mod auth; @@ -31,7 +31,7 @@ const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept"; enum ComponentState { Invalid, Disconnected, - Connecting(Box>), + Connecting(Box>), Connected(XMPPStream), } @@ -50,19 +50,30 @@ impl Component { }) } - fn make_connect(jid: Jid, password: String, server: &str, port: u16) -> impl Future { + fn make_connect( + jid: Jid, + password: String, + server: &str, + port: u16, + ) -> impl Future { let jid1 = jid.clone(); let password = password; done(Connecter::from_lookup(server, None, port)) .flatten() .and_then(move |tcp_stream| { - xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned()) - }).and_then(move |xmpp_stream| { - Self::auth(xmpp_stream, password).expect("auth") + xmpp_stream::XMPPStream::start( + tcp_stream, + jid1, + NS_JABBER_COMPONENT_ACCEPT.to_owned(), + ) }) + .and_then(move |xmpp_stream| Self::auth(xmpp_stream, password).expect("auth")) } - fn auth(stream: xmpp_stream::XMPPStream, password: String) -> Result, Error> { + fn auth( + stream: xmpp_stream::XMPPStream, + password: String, + ) -> Result, Error> { ComponentAuth::new(stream, password) } } @@ -75,31 +86,25 @@ impl Stream for Component { let state = replace(&mut self.state, ComponentState::Invalid); match state { - ComponentState::Invalid => - Err(Error::InvalidState), - ComponentState::Disconnected => - Ok(Async::Ready(None)), - ComponentState::Connecting(mut connect) => { - match connect.poll() { - Ok(Async::Ready(stream)) => { - self.state = ComponentState::Connected(stream); - Ok(Async::Ready(Some(Event::Online))) - }, - Ok(Async::NotReady) => { - self.state = ComponentState::Connecting(connect); - Ok(Async::NotReady) - }, - Err(e) => - Err(e), + ComponentState::Invalid => Err(Error::InvalidState), + ComponentState::Disconnected => Ok(Async::Ready(None)), + ComponentState::Connecting(mut connect) => match connect.poll() { + Ok(Async::Ready(stream)) => { + self.state = ComponentState::Connected(stream); + Ok(Async::Ready(Some(Event::Online))) + } + Ok(Async::NotReady) => { + self.state = ComponentState::Connecting(connect); + Ok(Async::NotReady) } + Err(e) => Err(e), }, ComponentState::Connected(mut stream) => { // Poll sink match stream.poll_complete() { Ok(Async::NotReady) => (), Ok(Async::Ready(())) => (), - Err(e) => - return Err(e)?, + Err(e) => return Err(e)?, }; // Poll stream @@ -107,24 +112,23 @@ impl Stream for Component { Ok(Async::NotReady) => { self.state = ComponentState::Connected(stream); Ok(Async::NotReady) - }, + } Ok(Async::Ready(None)) => { // EOF self.state = ComponentState::Disconnected; Ok(Async::Ready(Some(Event::Disconnected))) - }, + } Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { self.state = ComponentState::Connected(stream); Ok(Async::Ready(Some(Event::Stanza(stanza)))) - }, + } Ok(Async::Ready(_)) => { self.state = ComponentState::Connected(stream); Ok(Async::NotReady) - }, - Err(e) => - Err(e)?, + } + Err(e) => Err(e)?, } - }, + } } } } @@ -135,30 +139,26 @@ impl Sink for Component { fn start_send(&mut self, item: Self::SinkItem) -> StartSend { match self.state { - ComponentState::Connected(ref mut stream) => - match stream.start_send(Packet::Stanza(item)) { - Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => - Ok(AsyncSink::NotReady(stanza)), - Ok(AsyncSink::NotReady(_)) => - panic!("Component.start_send with stanza but got something else back"), - Ok(AsyncSink::Ready) => { - Ok(AsyncSink::Ready) - }, - Err(e) => - Err(e)?, - }, - _ => - Ok(AsyncSink::NotReady(item)), + ComponentState::Connected(ref mut stream) => match stream + .start_send(Packet::Stanza(item)) + { + Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)), + Ok(AsyncSink::NotReady(_)) => { + panic!("Component.start_send with stanza but got something else back") + } + Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), + Err(e) => Err(e)?, + }, + _ => Ok(AsyncSink::NotReady(item)), } } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { match &mut self.state { - &mut ComponentState::Connected(ref mut stream) => - stream.poll_complete() - .map_err(|e| e.into()), - _ => - Ok(Async::Ready(())), + &mut ComponentState::Connected(ref mut stream) => { + stream.poll_complete().map_err(|e| e.into()) + } + _ => Ok(Async::Ready(())), } } } diff --git a/src/error.rs b/src/error.rs index 43f5af1480b34de631a5a9dcd3679bb87d305611..26d794495ebff0562e894f022a955b1ea44e32c0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,11 +1,11 @@ -use std::io::Error as IoError; -use std::error::Error as StdError; -use std::str::Utf8Error; +use native_tls::Error as TlsError; use std::borrow::Cow; +use std::error::Error as StdError; use std::fmt; -use native_tls::Error as TlsError; -use trust_dns_resolver::error::ResolveError; +use std::io::Error as IoError; +use std::str::Utf8Error; use trust_dns_proto::error::ProtoError; +use trust_dns_resolver::error::ResolveError; use xmpp_parsers::error::Error as ParsersError; use xmpp_parsers::sasl::DefinedCondition as SaslDefinedCondition; diff --git a/src/happy_eyeballs.rs b/src/happy_eyeballs.rs index 451f9c73499a9e6137c3f402b73de41fd14fd551..19dbe4939d4e7e51d84fa8843646ddd10cef5eac 100644 --- a/src/happy_eyeballs.rs +++ b/src/happy_eyeballs.rs @@ -1,18 +1,18 @@ -use std::mem; -use std::io::Error as IoError; -use std::net::SocketAddr; +use crate::{ConnecterError, Error}; +use futures::{Async, Future, Poll}; +use std::cell::RefCell; use std::collections::BTreeMap; use std::collections::VecDeque; -use std::cell::RefCell; -use futures::{Future, Poll, Async}; -use tokio::net::TcpStream; +use std::io::Error as IoError; +use std::mem; +use std::net::SocketAddr; use tokio::net::tcp::ConnectFuture; -use trust_dns_resolver::{IntoName, Name, ResolverFuture, error::ResolveError}; +use tokio::net::TcpStream; +use trust_dns_resolver::config::LookupIpStrategy; use trust_dns_resolver::lookup::SrvLookupFuture; use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::system_conf; -use trust_dns_resolver::config::LookupIpStrategy; -use crate::{Error, ConnecterError}; +use trust_dns_resolver::{error::ResolveError, IntoName, Name, ResolverFuture}; enum State { AwaitResolver(Box + Send>), @@ -31,23 +31,26 @@ pub struct Connecter { error: Option, } -fn resolver_future() -> Result + Send>, IoError> { +fn resolver_future( +) -> Result + Send>, IoError> { let (conf, mut opts) = system_conf::read_system_conf()?; opts.ip_strategy = LookupIpStrategy::Ipv4AndIpv6; Ok(ResolverFuture::new(conf, opts)) } impl Connecter { - pub fn from_lookup(domain: &str, srv: Option<&str>, fallback_port: u16) -> Result { + pub fn from_lookup( + domain: &str, + srv: Option<&str>, + fallback_port: u16, + ) -> Result { if let Ok(ip) = domain.parse() { // use specified IP address, not domain name, skip the whole dns part - let connect = - RefCell::new(TcpStream::connect(&SocketAddr::new(ip, fallback_port))); + let connect = RefCell::new(TcpStream::connect(&SocketAddr::new(ip, fallback_port))); return Ok(Connecter { fallback_port, srv_domain: None, - domain: "nohost".into_name() - .map_err(ConnecterError::Dns)?, + domain: "nohost".into_name().map_err(ConnecterError::Dns)?, state: State::Connecting(None, vec![connect]), targets: VecDeque::new(), error: None, @@ -56,20 +59,18 @@ impl Connecter { let state = State::AwaitResolver(resolver_future()?); let srv_domain = match srv { - Some(srv) => - Some(format!("{}.{}.", srv, domain) - .into_name() - .map_err(ConnecterError::Dns)? - ), - None => - None, + Some(srv) => Some( + format!("{}.{}.", srv, domain) + .into_name() + .map_err(ConnecterError::Dns)?, + ), + None => None, }; Ok(Connecter { fallback_port, srv_domain, - domain: domain.into_name() - .map_err(ConnecterError::Dns)?, + domain: domain.into_name().map_err(ConnecterError::Dns)?, state, targets: VecDeque::new(), error: None, @@ -97,8 +98,8 @@ impl Future for Connecter { self.state = State::ResolveSrv(resolver, srv_lookup); } None => { - self.targets = - [(self.domain.clone(), self.fallback_port)].into_iter() + self.targets = [(self.domain.clone(), self.fallback_port)] + .into_iter() .cloned() .collect(); self.state = State::Connecting(Some(resolver), vec![]); @@ -115,22 +116,19 @@ impl Future for Connecter { Ok(Async::NotReady) } Ok(Async::Ready(srv_result)) => { - let srv_map: BTreeMap<_, _> = - srv_result.iter() + let srv_map: BTreeMap<_, _> = srv_result + .iter() .map(|srv| (srv.priority(), (srv.target().clone(), srv.port()))) .collect(); - let targets = - srv_map.into_iter() - .map(|(_, tp)| tp) - .collect(); + let targets = srv_map.into_iter().map(|(_, tp)| tp).collect(); self.targets = targets; self.state = State::Connecting(Some(resolver), vec![]); self.poll() } Err(_) => { // ignore, fallback - self.targets = - [(self.domain.clone(), self.fallback_port)].into_iter() + self.targets = [(self.domain.clone(), self.fallback_port)] + .into_iter() .cloned() .collect(); self.state = State::Connecting(Some(resolver), vec![]); @@ -147,36 +145,31 @@ impl Future for Connecter { self.poll() } else if connects.len() > 0 { let mut success = None; - connects.retain(|connect| { - match connect.borrow_mut().poll() { - Ok(Async::NotReady) => true, - Ok(Async::Ready(connection)) => { - success = Some(connection); - false + connects.retain(|connect| match connect.borrow_mut().poll() { + Ok(Async::NotReady) => true, + Ok(Async::Ready(connection)) => { + success = Some(connection); + false + } + Err(e) => { + if self.error.is_none() { + self.error = Some(e.into()); } - Err(e) => { - if self.error.is_none() { - self.error = Some(e.into()); - } - false - }, + false } }); match success { - Some(connection) => - Ok(Async::Ready(connection)), + Some(connection) => Ok(Async::Ready(connection)), None => { self.state = State::Connecting(resolver, connects); Ok(Async::NotReady) - }, + } } } else { // All targets tried match self.error.take() { - None => - Err(ConnecterError::AllFailed.into()), - Some(e) => - Err(e), + None => Err(ConnecterError::AllFailed.into()), + Some(e) => Err(e), } } } @@ -187,8 +180,8 @@ impl Future for Connecter { Ok(Async::NotReady) } Ok(Async::Ready(ip_result)) => { - let connects = - ip_result.iter() + let connects = ip_result + .iter() .map(|ip| RefCell::new(TcpStream::connect(&SocketAddr::new(ip, port)))) .collect(); self.state = State::Connecting(Some(resolver), connects); @@ -204,8 +197,7 @@ impl Future for Connecter { } } } - _ => panic!("") + _ => panic!(""), } } } - diff --git a/src/lib.rs b/src/lib.rs index b1942119211f44ce2b68f0d30b05460875077cd0..39876b1917baaed46289aa35a5fd7e5d7997d36a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,17 +5,17 @@ #[macro_use] extern crate derive_error; +mod starttls; +mod stream_start; pub mod xmpp_codec; pub mod xmpp_stream; -mod stream_start; -mod starttls; pub use crate::starttls::StartTlsClient; -mod happy_eyeballs; mod event; +mod happy_eyeballs; pub use crate::event::Event; mod client; pub use crate::client::Client; mod component; pub use crate::component::Component; mod error; -pub use crate::error::{Error, ProtocolError, AuthError, ConnecterError, ParseError, ParserError}; +pub use crate::error::{AuthError, ConnecterError, Error, ParseError, ParserError, ProtocolError}; diff --git a/src/starttls.rs b/src/starttls.rs index a7f7fd392bc19c80f1be419441aa5d6e0e566697..e48d1824a1220f98b70b015098cc9c93ec9a9b0a 100644 --- a/src/starttls.rs +++ b/src/starttls.rs @@ -1,12 +1,12 @@ -use std::mem::replace; -use futures::{Future, Sink, Poll, Async}; -use futures::stream::Stream; use futures::sink; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_tls::{TlsStream, TlsConnector, Connect}; -use native_tls::TlsConnector as NativeTlsConnector; -use minidom::Element; +use futures::stream::Stream; +use futures::{Async, Future, Poll, Sink}; use jid::Jid; +use minidom::Element; +use native_tls::TlsConnector as NativeTlsConnector; +use std::mem::replace; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_tls::{Connect, TlsConnector, TlsStream}; use crate::xmpp_codec::Packet; use crate::xmpp_stream::XMPPStream; @@ -15,7 +15,6 @@ use crate::Error; /// XMPP TLS XML namespace pub const NS_XMPP_TLS: &str = "urn:ietf:params:xml:ns:xmpp-tls"; - /// XMPP stream that switches to TLS if available in received features pub struct StartTlsClient { state: StartTlsClientState, @@ -34,9 +33,7 @@ impl StartTlsClient { pub fn from_stream(xmpp_stream: XMPPStream) -> Self { let jid = xmpp_stream.jid.clone(); - let nonza = Element::builder("starttls") - .ns(NS_XMPP_TLS) - .build(); + let nonza = Element::builder("starttls").ns(NS_XMPP_TLS).build(); let packet = Packet::Stanza(nonza); let send = xmpp_stream.send(packet); @@ -56,51 +53,56 @@ impl Future for StartTlsClient { let mut retry = false; let (new_state, result) = match old_state { - StartTlsClientState::SendStartTls(mut send) => - match send.poll() { - Ok(Async::Ready(xmpp_stream)) => { - let new_state = StartTlsClientState::AwaitProceed(xmpp_stream); - retry = true; - (new_state, Ok(Async::NotReady)) - }, - Ok(Async::NotReady) => - (StartTlsClientState::SendStartTls(send), Ok(Async::NotReady)), - Err(e) => - (StartTlsClientState::SendStartTls(send), Err(e.into())), - }, - StartTlsClientState::AwaitProceed(mut xmpp_stream) => - match xmpp_stream.poll() { - Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) - if stanza.name() == "proceed" => - { - let stream = xmpp_stream.stream.into_inner(); - let connect = TlsConnector::from(NativeTlsConnector::builder() - .build().unwrap()) + StartTlsClientState::SendStartTls(mut send) => match send.poll() { + Ok(Async::Ready(xmpp_stream)) => { + let new_state = StartTlsClientState::AwaitProceed(xmpp_stream); + retry = true; + (new_state, Ok(Async::NotReady)) + } + Ok(Async::NotReady) => { + (StartTlsClientState::SendStartTls(send), Ok(Async::NotReady)) + } + Err(e) => (StartTlsClientState::SendStartTls(send), Err(e.into())), + }, + StartTlsClientState::AwaitProceed(mut xmpp_stream) => match xmpp_stream.poll() { + Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) + if stanza.name() == "proceed" => + { + let stream = xmpp_stream.stream.into_inner(); + let connect = + TlsConnector::from(NativeTlsConnector::builder().build().unwrap()) .connect(&self.jid.domain, stream); - let new_state = StartTlsClientState::StartingTls(connect); - retry = true; - (new_state, Ok(Async::NotReady)) - }, - Ok(Async::Ready(value)) => { - println!("StartTlsClient ignore {:?}", value); - (StartTlsClientState::AwaitProceed(xmpp_stream), Ok(Async::NotReady)) - }, - Ok(_) => - (StartTlsClientState::AwaitProceed(xmpp_stream), Ok(Async::NotReady)), - Err(e) => - (StartTlsClientState::AwaitProceed(xmpp_stream), Err(Error::Protocol(e.into()))), - }, - StartTlsClientState::StartingTls(mut connect) => - match connect.poll() { - Ok(Async::Ready(tls_stream)) => - (StartTlsClientState::Invalid, Ok(Async::Ready(tls_stream))), - Ok(Async::NotReady) => - (StartTlsClientState::StartingTls(connect), Ok(Async::NotReady)), - Err(e) => - (StartTlsClientState::Invalid, Err(e.into())), - }, - StartTlsClientState::Invalid => - unreachable!(), + let new_state = StartTlsClientState::StartingTls(connect); + retry = true; + (new_state, Ok(Async::NotReady)) + } + Ok(Async::Ready(value)) => { + println!("StartTlsClient ignore {:?}", value); + ( + StartTlsClientState::AwaitProceed(xmpp_stream), + Ok(Async::NotReady), + ) + } + Ok(_) => ( + StartTlsClientState::AwaitProceed(xmpp_stream), + Ok(Async::NotReady), + ), + Err(e) => ( + StartTlsClientState::AwaitProceed(xmpp_stream), + Err(Error::Protocol(e.into())), + ), + }, + StartTlsClientState::StartingTls(mut connect) => match connect.poll() { + Ok(Async::Ready(tls_stream)) => { + (StartTlsClientState::Invalid, Ok(Async::Ready(tls_stream))) + } + Ok(Async::NotReady) => ( + StartTlsClientState::StartingTls(connect), + Ok(Async::NotReady), + ), + Err(e) => (StartTlsClientState::Invalid, Err(e.into())), + }, + StartTlsClientState::Invalid => unreachable!(), }; self.state = new_state; diff --git a/src/stream_start.rs b/src/stream_start.rs index 9aef117da7a622d2419cecd03256d4d9f31908bf..0a5945b7bda5e6d8dae908f23d20c59b3b5bb4e6 100644 --- a/src/stream_start.rs +++ b/src/stream_start.rs @@ -1,11 +1,11 @@ -use std::mem::replace; -use futures::{Future, Async, Poll, Stream, sink, Sink}; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_codec::Framed; +use futures::{sink, Async, Future, Poll, Sink, Stream}; use jid::Jid; use minidom::Element; +use std::mem::replace; +use tokio_codec::Framed; +use tokio_io::{AsyncRead, AsyncWrite}; -use crate::xmpp_codec::{XMPPCodec, Packet}; +use crate::xmpp_codec::{Packet, XMPPCodec}; use crate::xmpp_stream::XMPPStream; use crate::{Error, ProtocolError}; @@ -26,11 +26,15 @@ enum StreamStartState { impl StreamStart { pub fn from_stream(stream: Framed, jid: Jid, ns: String) -> Self { - let attrs = [("to".to_owned(), jid.domain.clone()), - ("version".to_owned(), "1.0".to_owned()), - ("xmlns".to_owned(), ns.clone()), - ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()), - ].iter().cloned().collect(); + let attrs = [ + ("to".to_owned(), jid.domain.clone()), + ("version".to_owned(), "1.0".to_owned()), + ("xmlns".to_owned(), ns.clone()), + ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()), + ] + .iter() + .cloned() + .collect(); let send = stream.send(Packet::StreamStart(attrs)); StreamStart { @@ -50,59 +54,66 @@ impl Future for StreamStart { let mut retry = false; let (new_state, result) = match old_state { - StreamStartState::SendStart(mut send) => - match send.poll() { - Ok(Async::Ready(stream)) => { + StreamStartState::SendStart(mut send) => match send.poll() { + Ok(Async::Ready(stream)) => { + retry = true; + (StreamStartState::RecvStart(stream), Ok(Async::NotReady)) + } + Ok(Async::NotReady) => (StreamStartState::SendStart(send), Ok(Async::NotReady)), + Err(e) => (StreamStartState::Invalid, Err(e.into())), + }, + StreamStartState::RecvStart(mut stream) => match stream.poll() { + Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => { + let stream_ns = stream_attrs + .get("xmlns") + .ok_or(ProtocolError::NoStreamNamespace)? + .clone(); + if self.ns == "jabber:client" { retry = true; - (StreamStartState::RecvStart(stream), Ok(Async::NotReady)) - }, - Ok(Async::NotReady) => - (StreamStartState::SendStart(send), Ok(Async::NotReady)), - Err(e) => - (StreamStartState::Invalid, Err(e.into())), - }, - StreamStartState::RecvStart(mut stream) => - match stream.poll() { - Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => { - let stream_ns = stream_attrs.get("xmlns") - .ok_or(ProtocolError::NoStreamNamespace)? + // TODO: skip RecvFeatures for version < 1.0 + ( + StreamStartState::RecvFeatures(stream, stream_ns), + Ok(Async::NotReady), + ) + } else { + let id = stream_attrs + .get("id") + .ok_or(ProtocolError::NoStreamId)? .clone(); - if self.ns == "jabber:client" { - retry = true; - // TODO: skip RecvFeatures for version < 1.0 - (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)) - } else { - let id = stream_attrs.get("id") - .ok_or(ProtocolError::NoStreamId)? - .clone(); - // FIXME: huge hack, shouldn’t be an element! - let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), Element::builder(id).build()); - (StreamStartState::Invalid, Ok(Async::Ready(stream))) - } - }, - Ok(Async::Ready(_)) => - return Err(ProtocolError::InvalidToken.into()), - Ok(Async::NotReady) => - (StreamStartState::RecvStart(stream), Ok(Async::NotReady)), - Err(e) => - return Err(ProtocolError::from(e).into()), - }, - StreamStartState::RecvFeatures(mut stream, stream_ns) => - match stream.poll() { - Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => - if stanza.is("features", NS_XMPP_STREAM) { - let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza); - (StreamStartState::Invalid, Ok(Async::Ready(stream))) - } else { - (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)) - }, - Ok(Async::Ready(_)) | Ok(Async::NotReady) => - (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)), - Err(e) => - return Err(ProtocolError::from(e).into()), - }, - StreamStartState::Invalid => - unreachable!(), + // FIXME: huge hack, shouldn’t be an element! + let stream = XMPPStream::new( + self.jid.clone(), + stream, + self.ns.clone(), + Element::builder(id).build(), + ); + (StreamStartState::Invalid, Ok(Async::Ready(stream))) + } + } + Ok(Async::Ready(_)) => return Err(ProtocolError::InvalidToken.into()), + Ok(Async::NotReady) => (StreamStartState::RecvStart(stream), Ok(Async::NotReady)), + Err(e) => return Err(ProtocolError::from(e).into()), + }, + StreamStartState::RecvFeatures(mut stream, stream_ns) => match stream.poll() { + Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { + if stanza.is("features", NS_XMPP_STREAM) { + let stream = + XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza); + (StreamStartState::Invalid, Ok(Async::Ready(stream))) + } else { + ( + StreamStartState::RecvFeatures(stream, stream_ns), + Ok(Async::NotReady), + ) + } + } + Ok(Async::Ready(_)) | Ok(Async::NotReady) => ( + StreamStartState::RecvFeatures(stream, stream_ns), + Ok(Async::NotReady), + ), + Err(e) => return Err(ProtocolError::from(e).into()), + }, + StreamStartState::Invalid => unreachable!(), }; self.state = new_state; diff --git a/src/xmpp_codec.rs b/src/xmpp_codec.rs index 4088552c9c9fea4fb4b0690b22cccc54fb5f2b02..e8c4e0538e1b47d308e947bc25d548e9a1ec83b6 100644 --- a/src/xmpp_codec.rs +++ b/src/xmpp_codec.rs @@ -1,22 +1,22 @@ //! XML stream parser for XMPP +use crate::{ParseError, ParserError}; +use bytes::{BufMut, BytesMut}; +use minidom::Element; +use quick_xml::Writer as EventWriter; use std; +use std::cell::RefCell; +use std::collections::vec_deque::VecDeque; +use std::collections::HashMap; use std::default::Default; +use std::fmt::Write; +use std::io; use std::iter::FromIterator; -use std::cell::RefCell; use std::rc::Rc; -use std::fmt::Write; use std::str::from_utf8; -use std::io; -use std::collections::HashMap; -use std::collections::vec_deque::VecDeque; -use tokio_codec::{Encoder, Decoder}; -use minidom::Element; -use xml5ever::tokenizer::{XmlTokenizer, TokenSink, Token, Tag, TagKind}; +use tokio_codec::{Decoder, Encoder}; use xml5ever::interface::Attribute; -use bytes::{BytesMut, BufMut}; -use quick_xml::Writer as EventWriter; -use crate::{ParserError, ParseError}; +use xml5ever::tokenizer::{Tag, TagKind, Token, TokenSink, XmlTokenizer}; /// Anything that can be sent or received on an XMPP/XML stream #[derive(Debug)] @@ -72,17 +72,21 @@ impl ParserSink { fn handle_start_tag(&mut self, tag: Tag) { let mut nss = HashMap::new(); - let is_prefix_xmlns = |attr: &Attribute| attr.name.prefix.as_ref() - .map(|prefix| prefix.eq_str_ignore_ascii_case("xmlns")) - .unwrap_or(false); + let is_prefix_xmlns = |attr: &Attribute| { + attr.name + .prefix + .as_ref() + .map(|prefix| prefix.eq_str_ignore_ascii_case("xmlns")) + .unwrap_or(false) + }; for attr in &tag.attrs { match attr.name.local.as_ref() { "xmlns" => { nss.insert(None, attr.value.as_ref().to_owned()); - }, + } prefix if is_prefix_xmlns(attr) => { - nss.insert(Some(prefix.to_owned()), attr.value.as_ref().to_owned()); - }, + nss.insert(Some(prefix.to_owned()), attr.value.as_ref().to_owned()); + } _ => (), } } @@ -90,10 +94,9 @@ impl ParserSink { let el = { let mut el_builder = Element::builder(tag.name.local.as_ref()); - if let Some(el_ns) = self.lookup_ns( - &tag.name.prefix.map(|prefix| - prefix.as_ref().to_owned()) - ) { + if let Some(el_ns) = + self.lookup_ns(&tag.name.prefix.map(|prefix| prefix.as_ref().to_owned())) + { el_builder = el_builder.ns(el_ns); } for attr in &tag.attrs { @@ -101,21 +104,20 @@ impl ParserSink { "xmlns" => (), _ if is_prefix_xmlns(attr) => (), _ => { - el_builder = el_builder.attr( - attr.name.local.as_ref(), - attr.value.as_ref() - ); - }, + el_builder = el_builder.attr(attr.name.local.as_ref(), attr.value.as_ref()); + } } } el_builder.build() }; if self.stack.is_empty() { - let attrs = HashMap::from_iter( - tag.attrs.iter() - .map(|attr| (attr.name.local.as_ref().to_owned(), attr.value.as_ref().to_owned())) - ); + let attrs = HashMap::from_iter(tag.attrs.iter().map(|attr| { + ( + attr.name.local.as_ref().to_owned(), + attr.value.as_ref().to_owned(), + ) + })); self.push_queue(Packet::StreamStart(attrs)); } @@ -128,15 +130,13 @@ impl ParserSink { match self.stack.len() { // - 0 => - self.push_queue(Packet::StreamEnd), + 0 => self.push_queue(Packet::StreamEnd), // - 1 => - self.push_queue(Packet::Stanza(el)), + 1 => self.push_queue(Packet::Stanza(el)), len => { let parent = &mut self.stack[len - 1]; parent.append_child(el); - }, + } } } } @@ -145,32 +145,26 @@ impl TokenSink for ParserSink { fn process_token(&mut self, token: Token) { match token { Token::TagToken(tag) => match tag.kind { - TagKind::StartTag => - self.handle_start_tag(tag), - TagKind::EndTag => - self.handle_end_tag(), + TagKind::StartTag => self.handle_start_tag(tag), + TagKind::EndTag => self.handle_end_tag(), TagKind::EmptyTag => { self.handle_start_tag(tag); self.handle_end_tag(); - }, - TagKind::ShortTag => - self.push_queue_error(ParserError::ShortTag), + } + TagKind::ShortTag => self.push_queue_error(ParserError::ShortTag), }, - Token::CharacterTokens(tendril) => - match self.stack.len() { - 0 | 1 => - self.push_queue(Packet::Text(tendril.into())), - len => { - let el = &mut self.stack[len - 1]; - el.append_text_node(tendril); - }, - }, - Token::EOFToken => - self.push_queue(Packet::StreamEnd), + Token::CharacterTokens(tendril) => match self.stack.len() { + 0 | 1 => self.push_queue(Packet::Text(tendril.into())), + len => { + let el = &mut self.stack[len - 1]; + el.append_text_node(tendril); + } + }, + Token::EOFToken => self.push_queue(Packet::StreamEnd), Token::ParseError(s) => { // println!("ParseError: {:?}", s); self.push_queue_error(ParserError::Parse(ParseError(s))); - }, + } _ => (), } } @@ -219,23 +213,22 @@ impl Decoder for XMPPCodec { type Error = ParserError; fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { - let buf1: Box> = - if ! self.buf.is_empty() && ! buf.is_empty() { - let mut prefix = std::mem::replace(&mut self.buf, vec![]); - prefix.extend_from_slice(buf.take().as_ref()); - Box::new(prefix) - } else { - Box::new(buf.take()) - }; + let buf1: Box> = if !self.buf.is_empty() && !buf.is_empty() { + let mut prefix = std::mem::replace(&mut self.buf, vec![]); + prefix.extend_from_slice(buf.take().as_ref()); + Box::new(prefix) + } else { + Box::new(buf.take()) + }; let buf1 = buf1.as_ref().as_ref(); match from_utf8(buf1) { Ok(s) => { - if ! s.is_empty() { + if !s.is_empty() { // println!("<< {}", s); let tendril = FromIterator::from_iter(s.chars()); self.parser.feed(tendril); } - }, + } // Remedies for truncated utf8 Err(e) if e.valid_up_to() >= buf1.len() - 3 => { // Prepare all the valid data @@ -249,17 +242,16 @@ impl Decoder for XMPPCodec { self.buf.extend_from_slice(&buf1[e.valid_up_to()..]); return result; - }, + } Err(e) => { // println!("error {} at {}/{} in {:?}", e, e.valid_up_to(), buf1.len(), buf1); return Err(ParserError::Utf8(e)); - }, + } } match self.queue.borrow_mut().pop_front() { None => Ok(None), - Some(result) => - result.map(|pkt| Some(pkt)), + Some(result) => result.map(|pkt| Some(pkt)), } } @@ -284,8 +276,7 @@ impl Encoder for XMPPCodec { let mut buf = String::new(); write!(buf, "\n").unwrap(); print!(">> {}", buf); - write!(dst, "{}", buf) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e)) - }, + write!(dst, "{}", buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e)) + } Packet::Stanza(stanza) => { - stanza.write_to_inner(&mut EventWriter::new(WriteBytes::new(dst))) + stanza + .write_to_inner(&mut EventWriter::new(WriteBytes::new(dst))) .and_then(|_| { // println!(">> {:?}", dst); Ok(()) }) .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{}", e))) - }, + } Packet::Text(text) => { write_text(&text, dst) .and_then(|_| { @@ -311,9 +302,9 @@ impl Encoder for XMPPCodec { Ok(()) }) .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("{}", e))) - }, + } // TODO: Implement all - _ => Ok(()) + _ => Ok(()), } } } @@ -334,7 +325,7 @@ pub fn escape(input: &str) -> String { '>' => result.push_str(">"), '\'' => result.push_str("'"), '"' => result.push_str("""), - o => result.push(o) + o => result.push(o), } } result @@ -364,7 +355,6 @@ impl<'a> std::io::Write for WriteBytes<'a> { } } - #[cfg(test)] mod tests { use super::*; @@ -405,10 +395,7 @@ mod tests { b.put(r">"); let r = c.decode(&mut b); assert!(match r { - Ok(Some(Packet::Stanza(ref el))) - if el.name() == "test" - && el.text() == "ß" - => true, + Ok(Some(Packet::Stanza(ref el))) if el.name() == "test" && el.text() == "ß" => true, _ => false, }); } @@ -436,10 +423,7 @@ mod tests { b.put(&b"\x9f"[..]); let r = c.decode(&mut b); assert!(match r { - Ok(Some(Packet::Stanza(ref el))) - if el.name() == "test" - && el.text() == "ß" - => true, + Ok(Some(Packet::Stanza(ref el))) if el.name() == "test" && el.text() == "ß" => true, _ => false, }); } @@ -447,8 +431,8 @@ mod tests { /// By default, encode() only get's a BytesMut that has 8kb space reserved. #[test] fn test_large_stanza() { - use std::io::Cursor; use futures::{Future, Sink}; + use std::io::Cursor; use tokio_codec::FramedWrite; let framed = FramedWrite::new(Cursor::new(vec![]), XMPPCodec::new()); let mut text = "".to_owned(); @@ -456,15 +440,12 @@ mod tests { text = text + "A"; } let stanza = Element::builder("message") - .append( - Element::builder("body") - .append(&text) - .build() - ) + .append(Element::builder("body").append(&text).build()) .build(); - let framed = framed.send(Packet::Stanza(stanza)) - .wait() - .expect("send"); - assert_eq!(framed.get_ref().get_ref(), &("".to_owned() + &text + "").as_bytes()); + let framed = framed.send(Packet::Stanza(stanza)).wait().expect("send"); + assert_eq!( + framed.get_ref().get_ref(), + &("".to_owned() + &text + "").as_bytes() + ); } } diff --git a/src/xmpp_stream.rs b/src/xmpp_stream.rs index d8e878d5a2bf52451547808a090889cc2119c3bc..c67cc1efdd6ea8b53e84c784b2e88b4829fb6772 100644 --- a/src/xmpp_stream.rs +++ b/src/xmpp_stream.rs @@ -1,14 +1,14 @@ //! `XMPPStream` is the common container for all XMPP network connections -use futures::{Poll, Stream, Sink, StartSend}; use futures::sink::Send; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_codec::Framed; -use minidom::Element; +use futures::{Poll, Sink, StartSend, Stream}; use jid::Jid; +use minidom::Element; +use tokio_codec::Framed; +use tokio_io::{AsyncRead, AsyncWrite}; -use crate::xmpp_codec::{XMPPCodec, Packet}; use crate::stream_start::StreamStart; +use crate::xmpp_codec::{Packet, XMPPCodec}; /// namespace pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams"; @@ -30,11 +30,18 @@ pub struct XMPPStream { impl XMPPStream { /// Constructor - pub fn new(jid: Jid, - stream: Framed, - ns: String, - stream_features: Element) -> Self { - XMPPStream { jid, stream, stream_features, ns } + pub fn new( + jid: Jid, + stream: Framed, + ns: String, + stream_features: Element, + ) -> Self { + XMPPStream { + jid, + stream, + stream_features, + ns, + } } /// Send a `` start tag