Merge branch 'migrate-tokio'

Astro created

Change summary

Cargo.toml                 |  10 +-
examples/echo_bot.rs       |  10 +-
examples/echo_component.rs |  12 +-
src/client/mod.rs          |  22 ++--
src/component/mod.rs       |  25 ++--
src/error.rs               |  32 +++++
src/happy_eyeballs.rs      | 199 ++++++++++++++++++++++++---------------
src/lib.rs                 |   5 
src/starttls.rs            |  14 +-
9 files changed, 196 insertions(+), 133 deletions(-)

Detailed changes

Cargo.toml 🔗

@@ -12,18 +12,18 @@ keywords = ["xmpp", "tokio"]
 
 [dependencies]
 futures = "0.1"
-tokio-core = "0.1"
+tokio = "0.1"
 tokio-io = "0.1"
 tokio-codec = "0.1"
 bytes = "0.4.9"
 xml5ever = "0.12"
 minidom = "0.9"
-# TODO: update to 0.2.0
-native-tls = "0.1"
-tokio-tls = "0.1"
+native-tls = "0.2"
+tokio-tls = "0.2"
 sasl = "0.4"
 jid = { version = "0.5", features = ["minidom"] }
-domain = "0.2"
+trust-dns-resolver = "0.9.1"
+trust-dns-proto = "0.4.0"
 xmpp-parsers = "0.11"
 idna = "0.1"
 try_from = "0.2"

examples/echo_bot.rs 🔗

@@ -1,5 +1,5 @@
 extern crate futures;
-extern crate tokio_core;
+extern crate tokio;
 extern crate tokio_xmpp;
 extern crate jid;
 extern crate minidom;
@@ -9,8 +9,8 @@ extern crate try_from;
 use std::env::args;
 use std::process::exit;
 use try_from::TryFrom;
-use tokio_core::reactor::Core;
 use futures::{Stream, Sink, future};
+use tokio::runtime::current_thread::Runtime;
 use tokio_xmpp::Client;
 use minidom::Element;
 use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow};
@@ -27,9 +27,9 @@ fn main() {
     let password = &args[2];
 
     // tokio_core context
-    let mut core = Core::new().unwrap();
+    let mut rt = Runtime::new().unwrap();
     // Client instance
-    let client = Client::new(jid, password, core.handle()).unwrap();
+    let client = Client::new(jid, password).unwrap();
 
     // Make the two interfaces for sending and receiving independent
     // of each other so we can move one into a closure.
@@ -64,7 +64,7 @@ fn main() {
     });
 
     // Start polling `done`
-    match core.run(done) {
+    match rt.block_on(done) {
         Ok(_) => (),
         Err(e) => {
             println!("Fatal: {}", e);

examples/echo_component.rs 🔗

@@ -1,5 +1,5 @@
 extern crate futures;
-extern crate tokio_core;
+extern crate tokio;
 extern crate tokio_xmpp;
 extern crate jid;
 extern crate minidom;
@@ -10,7 +10,7 @@ use std::env::args;
 use std::process::exit;
 use std::str::FromStr;
 use try_from::TryFrom;
-use tokio_core::reactor::Core;
+use tokio::runtime::current_thread::Runtime;
 use futures::{Stream, Sink, future};
 use tokio_xmpp::Component;
 use minidom::Element;
@@ -30,10 +30,10 @@ fn main() {
     let port: u16 = args.get(4).unwrap().parse().unwrap_or(5347u16);
 
     // tokio_core context
-    let mut core = Core::new().unwrap();
+    let mut rt = Runtime::new().unwrap();
     // Component instance
-    println!("{} {} {} {} {:?}", jid, password, server, port, core.handle());
-    let component = Component::new(jid, password, server, port, core.handle()).unwrap();
+    println!("{} {} {} {}", jid, password, server, port);
+    let component = Component::new(jid, password, server, port).unwrap();
 
     // Make the two interfaces for sending and receiving independent
     // of each other so we can move one into a closure.
@@ -70,7 +70,7 @@ fn main() {
     });
 
     // Start polling `done`
-    match core.run(done) {
+    match rt.block_on(done) {
         Ok(_) => (),
         Err(e) => {
             println!("Fatal: {}", e);

src/client/mod.rs 🔗

@@ -1,8 +1,6 @@
 use std::mem::replace;
 use std::str::FromStr;
-use std::error::Error as StdError;
-use tokio_core::reactor::Handle;
-use tokio_core::net::TcpStream;
+use tokio::net::TcpStream;
 use tokio_io::{AsyncRead, AsyncWrite};
 use tokio_tls::TlsStream;
 use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done};
@@ -45,17 +43,17 @@ impl Client {
     ///
     /// Start polling the returned instance so that it will connect
     /// and yield events.
-    pub fn new(jid: &str, password: &str, handle: Handle) -> Result<Self, JidParseError> {
+    pub fn new(jid: &str, password: &str) -> Result<Self, JidParseError> {
         let jid = Jid::from_str(jid)?;
         let password = password.to_owned();
-        let connect = Self::make_connect(jid.clone(), password.clone(), handle);
+        let connect = Self::make_connect(jid.clone(), password.clone());
         Ok(Client {
             jid,
             state: ClientState::Connecting(Box::new(connect)),
         })
     }
 
-    fn make_connect(jid: Jid, password: String, handle: Handle) -> impl Future<Item=XMPPStream, Error=Error> {
+    fn make_connect(jid: Jid, password: String) -> impl Future<Item=XMPPStream, Error=Error> {
         let username = jid.node.as_ref().unwrap().to_owned();
         let jid1 = jid.clone();
         let jid2 = jid.clone();
@@ -63,8 +61,8 @@ impl Client {
         done(idna::domain_to_ascii(&jid.domain))
             .map_err(|_| Error::Idna)
             .and_then(|domain|
-                      done(Connecter::from_lookup(handle, &domain, "_xmpp-client._tcp", 5222))
-                      .map_err(Error::Domain)
+                      done(Connecter::from_lookup(&domain, "_xmpp-client._tcp", 5222))
+                      .map_err(Error::Connection)
             )
             .and_then(|connecter|
                       connecter
@@ -149,7 +147,7 @@ impl Stream for Client {
                     Ok(Async::NotReady) => (),
                     Ok(Async::Ready(())) => (),
                     Err(e) =>
-                        return Err(Error::Io(e)),
+                        return Err(e.into()),
                 };
 
                 // Poll stream
@@ -178,7 +176,7 @@ impl Stream for Client {
 
 impl Sink for Client {
     type SinkItem = Element;
-    type SinkError = String;
+    type SinkError = Error;
 
     fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
         match self.state {
@@ -192,7 +190,7 @@ impl Sink for Client {
                         Ok(AsyncSink::Ready)
                     },
                     Err(e) =>
-                        Err(e.description().to_owned()),
+                        Err(e.into()),
                 },
             _ =>
                 Ok(AsyncSink::NotReady(item)),
@@ -203,7 +201,7 @@ impl Sink for Client {
         match self.state {
             ClientState::Connected(ref mut stream) =>
                 stream.poll_complete()
-                .map_err(|e| e.description().to_owned()),
+                .map_err(|e| e.into()),
             _ =>
                 Ok(Async::Ready(())),
         }

src/component/mod.rs 🔗

@@ -3,9 +3,7 @@
 //! allowed to use any user and resource identifiers in their stanzas.
 use std::mem::replace;
 use std::str::FromStr;
-use std::error::Error as StdError;
-use tokio_core::reactor::Handle;
-use tokio_core::net::TcpStream;
+use tokio::net::TcpStream;
 use tokio_io::{AsyncRead, AsyncWrite};
 use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done};
 use minidom::Element;
@@ -42,24 +40,23 @@ impl Component {
     ///
     /// Start polling the returned instance so that it will connect
     /// and yield events.
-    pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result<Self, JidParseError> {
+    pub fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, JidParseError> {
         let jid = Jid::from_str(jid)?;
         let password = password.to_owned();
-        let connect = Self::make_connect(jid.clone(), password, server, port, handle);
+        let connect = Self::make_connect(jid.clone(), password, server, port);
         Ok(Component {
             jid,
             state: ComponentState::Connecting(Box::new(connect)),
         })
     }
 
-    fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> impl Future<Item=XMPPStream, Error=Error> {
+    fn make_connect(jid: Jid, password: String, server: &str, port: u16) -> impl Future<Item=XMPPStream, Error=Error> {
         let jid1 = jid.clone();
         let password = password;
-        done(Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port))
-            .map_err(Error::Domain)
-            .and_then(|connecter| connecter
-                      .map_err(Error::Connection)
-            ).and_then(move |tcp_stream| {
+        done(Connecter::from_lookup(server, "_xmpp-component._tcp", port))
+            .and_then(|connecter| connecter)
+            .map_err(Error::Connection)
+            .and_then(move |tcp_stream| {
                 xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
             }).and_then(move |xmpp_stream| {
                 Self::auth(xmpp_stream, password).expect("auth")
@@ -135,7 +132,7 @@ impl Stream for Component {
 
 impl Sink for Component {
     type SinkItem = Element;
-    type SinkError = String;
+    type SinkError = Error;
 
     fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
         match self.state {
@@ -149,7 +146,7 @@ impl Sink for Component {
                         Ok(AsyncSink::Ready)
                     },
                     Err(e) =>
-                        Err(e.description().to_owned()),
+                        Err(e.into()),
                 },
             _ =>
                 Ok(AsyncSink::NotReady(item)),
@@ -160,7 +157,7 @@ impl Sink for Component {
         match &mut self.state {
             &mut ComponentState::Connected(ref mut stream) =>
                 stream.poll_complete()
-                .map_err(|e| e.description().to_owned()),
+                .map_err(|e| e.into()),
             _ =>
                 Ok(Async::Ready(())),
         }

src/error.rs 🔗

@@ -3,9 +3,10 @@ use std::error::Error as StdError;
 use std::str::Utf8Error;
 use std::borrow::Cow;
 use std::fmt;
-use domain::resolv::error::Error as DNSError;
-use domain::bits::name::FromStrError;
 use native_tls::Error as TlsError;
+use trust_dns_resolver::error::ResolveError;
+use trust_dns_proto::error::ProtoError;
+
 use xmpp_parsers::error::Error as ParsersError;
 use xmpp_parsers::sasl::DefinedCondition as SaslDefinedCondition;
 
@@ -16,7 +17,6 @@ pub enum Error {
     /// DNS label conversion error, no details available from module
     /// `idna`
     Idna,
-    Domain(FromStrError),
     Protocol(ProtocolError),
     Auth(AuthError),
     Tls(TlsError),
@@ -90,5 +90,29 @@ pub enum AuthError {
 pub enum ConnecterError {
     NoSrv,
     AllFailed,
-    DNS(DNSError),
+    /// DNS name error
+    Domain(DomainError),
+    /// DNS resolution error
+    Dns(ProtoError),
+    /// DNS resolution error
+    Resolve(ResolveError),
+}
+
+/// DNS name error wrapper type
+#[derive(Debug)]
+pub struct DomainError(pub String);
+
+impl StdError for DomainError {
+    fn description(&self) -> &str {
+        &self.0
+    }
+    fn cause(&self) -> Option<&StdError> {
+        None
+    }
+}
+
+impl fmt::Display for DomainError {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "{}", self.0)
+    }
 }

src/happy_eyeballs.rs 🔗

@@ -1,36 +1,44 @@
-use std::str::FromStr;
-use std::collections::HashMap;
-use std::net::SocketAddr;
-use futures::{Future, Poll, Async, Stream};
-use tokio_core::reactor::Handle;
-use tokio_core::net::{TcpStream, TcpStreamNew};
-use domain::resolv::Resolver;
-use domain::resolv::lookup::srv::{lookup_srv, LookupSrv, LookupSrvStream};
-use domain::bits::name::{DNameBuf, FromStrError};
+use std::mem;
+use std::net::{SocketAddr, IpAddr};
+use std::collections::{BTreeMap, btree_map};
+use std::collections::VecDeque;
+use futures::{Future, Poll, Async};
+use tokio::net::{ConnectFuture, TcpStream};
+use trust_dns_resolver::{IntoName, Name, ResolverFuture, error::ResolveError};
+use trust_dns_resolver::lookup::SrvLookupFuture;
+use trust_dns_resolver::lookup_ip::LookupIpFuture;
+use trust_dns_proto::rr::rdata::srv::SRV;
 use ConnecterError;
 
 pub struct Connecter {
-    handle: Handle,
-    resolver: Resolver,
-    lookup: Option<LookupSrv>,
-    srvs: Option<LookupSrvStream>,
-    connects: HashMap<SocketAddr, TcpStreamNew>,
+    fallback_port: u16,
+    name: Name,
+    domain: Name,
+    resolver_future: Box<Future<Item = ResolverFuture, Error = ResolveError> + Send>,
+    resolver_opt: Option<ResolverFuture>,
+    srv_lookup_opt: Option<SrvLookupFuture>,
+    srvs_opt: Option<btree_map::IntoIter<u16, SRV>>,
+    ip_lookup_opt: Option<(u16, LookupIpFuture)>,
+    ips_opt: Option<(u16, VecDeque<IpAddr>)>,
+    connect_opt: Option<ConnectFuture>,
 }
 
 impl Connecter {
-    pub fn from_lookup(handle: Handle, domain: &str, srv: &str, fallback_port: u16) -> Result<Connecter, FromStrError> {
-        let domain = DNameBuf::from_str(domain)?;
-        let srv = DNameBuf::from_str(srv)?;
-
-        let resolver = Resolver::new(&handle);
-        let lookup = lookup_srv(resolver.clone(), srv, domain, fallback_port);
+    pub fn from_lookup(domain: &str, srv: &str, fallback_port: u16) -> Result<Connecter, ConnecterError> {
+        let resolver_future = ResolverFuture::from_system_conf()?;
+        let name = format!("{}.{}.", srv, domain).into_name()?;
 
         Ok(Connecter {
-            handle,
-            resolver,
-            lookup: Some(lookup),
-            srvs: None,
-            connects: HashMap::new(),
+            fallback_port,
+            name,
+            domain: domain.into_name()?,
+            resolver_future,
+            resolver_opt: None,
+            srv_lookup_opt: None,
+            srvs_opt: None,
+            ip_lookup_opt: None,
+            ips_opt: None,
+            connect_opt: None,
         })
     }
 }
@@ -40,69 +48,104 @@ impl Future for Connecter {
     type Error = ConnecterError;
 
     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
-        match self.lookup.as_mut().map(|lookup| lookup.poll()) {
-            None | Some(Ok(Async::NotReady)) => (),
-            Some(Ok(Async::Ready(found_srvs))) => {
-                self.lookup = None;
-                match found_srvs {
-                    Some(srvs) =>
-                        self.srvs = Some(srvs.to_stream(self.resolver.clone())),
-                    None =>
-                        return Err(ConnecterError::NoSrv),
-                }
-            },
-            Some(Err(e)) =>
-                return Err(e.into()),
+        if self.resolver_opt.is_none() {
+            //println!("Poll resolver future");
+            match self.resolver_future.poll() {
+                Ok(Async::Ready(resolver)) =>
+                    self.resolver_opt = Some(resolver),
+                Ok(Async::NotReady) =>
+                    return Ok(Async::NotReady),
+                Err(e) =>
+                    return Err(e.into()),
+            }
         }
 
-        match self.srvs.as_mut().map(|srv| srv.poll()) {
-            None | Some(Ok(Async::NotReady)) => (),
-            Some(Ok(Async::Ready(None))) =>
-                self.srvs = None,
-            Some(Ok(Async::Ready(Some(srv_item)))) => {
-                let handle = &self.handle;
-                for addr in srv_item.to_socket_addrs() {
-                    self.connects.entry(addr)
-                        .or_insert_with(|| {
-                            // println!("Connect to {}", addr);
-                            TcpStream::connect(&addr, handle)
-                        });
+        if let Some(ref resolver) = self.resolver_opt {
+            if self.srvs_opt.is_none() {
+                if self.srv_lookup_opt.is_none() {
+                    //println!("Lookup srv: {:?}", self.name);
+                    self.srv_lookup_opt = Some(resolver.lookup_srv(&self.name));
                 }
-            },
-            Some(Err(e)) =>
-                return Err(e.into()),
-        }
 
-        let mut connected_stream = None;
-        self.connects.retain(|_, connect| {
-            if connected_stream.is_some() {
-                return false;
+                if let Some(ref mut srv_lookup) = self.srv_lookup_opt {
+                    match srv_lookup.poll() {
+                        Ok(Async::Ready(t)) => {
+                            let mut srvs = BTreeMap::new();
+                            for srv in t.iter() {
+                                srvs.insert(srv.priority(), srv.clone());
+                            }
+                            srvs.insert(65535, SRV::new(65535, 0, self.fallback_port, self.domain.clone()));
+                            self.srvs_opt = Some(srvs.into_iter());
+                        }
+                        Ok(Async::NotReady) => return Ok(Async::NotReady),
+                        Err(_) => {
+                            //println!("Ignore SVR error: {:?}", e);
+                            let mut srvs = BTreeMap::new();
+                            srvs.insert(65535, SRV::new(65535, 0, self.fallback_port, self.domain.clone()));
+                            self.srvs_opt = Some(srvs.into_iter());
+                        },
+                    }
+                }
             }
 
-            match connect.poll() {
-                Ok(Async::NotReady) => true,
-                Ok(Async::Ready(tcp_stream)) => {
-                    // Success!
-                    connected_stream = Some(tcp_stream);
-                    false
-                },
-                Err(_e) => {
-                    // println!("{}", _e);
-                    false
-                },
+            if self.connect_opt.is_none() {
+                if self.ips_opt.is_none() {
+                    if self.ip_lookup_opt.is_none() {
+                        if let Some(ref mut srvs) = self.srvs_opt {
+                            if let Some((_, srv)) = srvs.next() {
+                                //println!("Lookup ip: {:?}", srv);
+                                self.ip_lookup_opt = Some((srv.port(), resolver.lookup_ip(srv.target())));
+                            } else {
+                                return Err(ConnecterError::NoSrv);
+                            }
+                        }
+                    }
+
+                    if let Some((port, mut ip_lookup)) = mem::replace(&mut self.ip_lookup_opt, None) {
+                        match ip_lookup.poll() {
+                            Ok(Async::Ready(t)) => {
+                                let mut ip_deque = VecDeque::new();
+                                ip_deque.extend(t.iter());
+                                //println!("IPs: {:?}", ip_deque);
+                                self.ips_opt = Some((port, ip_deque));
+                                self.ip_lookup_opt = None;
+                            },
+                            Ok(Async::NotReady) => {
+                                self.ip_lookup_opt = Some((port, ip_lookup));
+                                return Ok(Async::NotReady)
+                            },
+                            Err(_) => {
+                                //println!("Ignore lookup error: {:?}", e);
+                                self.ip_lookup_opt = None;
+                            }
+                        }
+                    }
+                }
+
+                if let Some((port, mut ip_deque)) = mem::replace(&mut self.ips_opt, None) {
+                    if let Some(ip) = ip_deque.pop_front() {
+                        //println!("Connect to {:?}:{}", ip, port);
+                        self.connect_opt = Some(TcpStream::connect(&SocketAddr::new(ip, port)));
+                        self.ips_opt = Some((port, ip_deque));
+                    }
+                }
             }
-        });
-        if let Some(tcp_stream) = connected_stream {
-            return Ok(Async::Ready(tcp_stream));
-        }
 
-        if  self.lookup.is_none() &&
-            self.srvs.is_none() &&
-            self.connects.is_empty()
-        {
-            return Err(ConnecterError::AllFailed);
+            if let Some(mut connect_future) = mem::replace(&mut self.connect_opt, None) {
+                match connect_future.poll() {
+                    Ok(Async::Ready(t)) => return Ok(Async::Ready(t)),
+                    Ok(Async::NotReady) => {
+                        self.connect_opt = Some(connect_future);
+                        return Ok(Async::NotReady)
+                    }
+                    Err(_) => {
+                        //println!("Ignore connect error: {:?}", e);
+                    },
+                }
+            }
         }
 
         Ok(Async::NotReady)
     }
 }
+

src/lib.rs 🔗

@@ -3,7 +3,7 @@
 //! XMPP implemeentation with asynchronous I/O using Tokio.
 
 extern crate futures;
-extern crate tokio_core;
+extern crate tokio;
 extern crate tokio_io;
 extern crate tokio_codec;
 extern crate bytes;
@@ -14,7 +14,8 @@ extern crate native_tls;
 extern crate tokio_tls;
 extern crate sasl;
 extern crate jid;
-extern crate domain;
+extern crate trust_dns_resolver;
+extern crate trust_dns_proto;
 extern crate idna;
 extern crate xmpp_parsers;
 extern crate try_from;

src/starttls.rs 🔗

@@ -3,8 +3,8 @@ use futures::{Future, Sink, Poll, Async};
 use futures::stream::Stream;
 use futures::sink;
 use tokio_io::{AsyncRead, AsyncWrite};
-use tokio_tls::{TlsStream, TlsConnectorExt, ConnectAsync};
-use native_tls::TlsConnector;
+use tokio_tls::{TlsStream, TlsConnector, Connect};
+use native_tls::TlsConnector as NativeTlsConnector;
 use minidom::Element;
 use jid::Jid;
 
@@ -26,7 +26,7 @@ enum StartTlsClientState<S: AsyncRead + AsyncWrite> {
     Invalid,
     SendStartTls(sink::Send<XMPPStream<S>>),
     AwaitProceed(XMPPStream<S>),
-    StartingTls(ConnectAsync<S>),
+    StartingTls(Connect<S>),
 }
 
 impl<S: AsyncRead + AsyncWrite> StartTlsClient<S> {
@@ -54,7 +54,7 @@ impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
         let old_state = replace(&mut self.state, StartTlsClientState::Invalid);
         let mut retry = false;
-        
+
         let (new_state, result) = match old_state {
             StartTlsClientState::SendStartTls(mut send) =>
                 match send.poll() {
@@ -74,9 +74,9 @@ impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
                         if stanza.name() == "proceed" =>
                     {
                         let stream = xmpp_stream.stream.into_inner();
-                        let connect = TlsConnector::builder().unwrap()
-                            .build().unwrap()
-                            .connect_async(&self.jid.domain, stream);
+                        let connect = TlsConnector::from(NativeTlsConnector::builder()
+                            .build().unwrap())
+                            .connect(&self.jid.domain, stream);
                         let new_state = StartTlsClientState::StartingTls(connect);
                         retry = true;
                         (new_state, Ok(Async::NotReady))