implement SRV lookup

Astro created

Change summary

Cargo.toml            |  3 
src/client/mod.rs     | 48 ++++++++++------------
src/happy_eyeballs.rs | 91 +++++++++++++++++++++++++++++++++++++++++++++
src/lib.rs            |  3 
src/tcp.rs            |  8 +++
5 files changed, 125 insertions(+), 28 deletions(-)

Detailed changes

Cargo.toml 🔗

@@ -5,7 +5,7 @@ authors = ["Astro <astro@spaceboyz.net>"]
 
 [dependencies]
 futures = "*"
-tokio-core = "*"
+tokio-core = "0.1.7"
 tokio-io = "*"
 bytes = "*"
 RustyXML = "*"
@@ -14,3 +14,4 @@ tokio-tls = "*"
 sasl = "*"
 rustc-serialize = "*"
 jid = "*"
+domain = "0.2.1"

src/client/mod.rs 🔗

@@ -14,6 +14,7 @@ use super::xmpp_codec::Packet;
 use super::xmpp_stream;
 use super::tcp::TcpClient;
 use super::starttls::{NS_XMPP_TLS, StartTlsClient};
+use super::happy_eyeballs::Connecter;
 
 mod auth;
 use self::auth::*;
@@ -37,7 +38,7 @@ enum ClientState {
 }
 
 impl Client {
-    pub fn new(jid: &str, password: &str, handle: &Handle) -> Result<Self, JidParseError> {
+    pub fn new(jid: &str, password: &str, handle: Handle) -> Result<Self, JidParseError> {
         let jid = try!(Jid::from_str(jid));
         let password = password.to_owned();
         let connect = Self::make_connect(jid.clone(), password.clone(), handle);
@@ -47,34 +48,29 @@ impl Client {
         })
     }
 
-    fn make_connect(jid: Jid, password: String, handle: &Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
-        // TODO: implement proper DNS SRV lookup
-        use std::net::ToSocketAddrs;
-        let addr = "89.238.79.220:5222"
-            .to_socket_addrs().unwrap()
-            .next().unwrap();
+    fn make_connect(jid: Jid, password: String, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
         let username = jid.node.as_ref().unwrap().to_owned();
         let password = password;
         Box::new(
-            TcpClient::connect(
-                jid,
-                &addr,
-                handle
-            ).map_err(|e| format!("{}", e)
-            ).and_then(|stream| {
-                if Self::can_starttls(&stream) {
-                    Self::starttls(stream)
-                } else {
-                    panic!("No STARTTLS")
-                }
-            }).and_then(move |stream| {
-                Self::auth(stream, username, password).expect("auth")
-            }).and_then(|stream| {
-                Self::bind(stream)
-            }).and_then(|stream| {
-                println!("Bound to {}", stream.jid);
-                Ok(stream)
-            })
+            Connecter::from_lookup(handle, &jid.domain, "_xmpp-client._tcp", 5222)
+                .expect("Connector::from_lookup")
+                .and_then(|tcp_stream|
+                          TcpClient::from_stream(jid, tcp_stream)
+                          .map_err(|e| format!("{}", e))
+                ).and_then(|stream| {
+                    if Self::can_starttls(&stream) {
+                        Self::starttls(stream)
+                    } else {
+                        panic!("No STARTTLS")
+                    }
+                }).and_then(move |stream| {
+                    Self::auth(stream, username, password).expect("auth")
+                }).and_then(|stream| {
+                    Self::bind(stream)
+                }).and_then(|stream| {
+                    println!("Bound to {}", stream.jid);
+                    Ok(stream)
+                })
         )
     }
 

src/happy_eyeballs.rs 🔗

@@ -0,0 +1,91 @@
+use std::str::FromStr;
+use futures::*;
+use tokio_core::reactor::Handle;
+use tokio_core::net::{TcpStream, TcpStreamNew};
+use domain::resolv::Resolver;
+use domain::resolv::lookup::srv::*;
+use domain::bits::DNameBuf;
+
+pub struct Connecter {
+    handle: Handle,
+    resolver: Resolver,
+    lookup: Option<LookupSrv>,
+    srvs: Option<LookupSrvStream>,
+    connects: Vec<TcpStreamNew>,
+}
+
+impl Connecter {
+    pub fn from_lookup(handle: Handle, domain: &str, srv: &str, fallback_port: u16) -> Result<Connecter, String> {
+        let domain = try!(
+            DNameBuf::from_str(domain)
+                .map_err(|e| format!("{}", e))
+        );
+        let srv = try!(
+            DNameBuf::from_str(srv)
+                .map_err(|e| format!("{}", e))
+        );
+
+        let resolver = Resolver::new(&handle);
+        let lookup = lookup_srv(resolver.clone(), srv, domain, fallback_port);
+
+        Ok(Connecter {
+            handle,
+            resolver,
+            lookup: Some(lookup),
+            srvs: None,
+            connects: vec![],
+        })
+    }
+}
+
+impl Future for Connecter {
+    type Item = TcpStream;
+    type Error = String;
+
+    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        match self.lookup.as_mut().map(|mut lookup| lookup.poll()) {
+            None => (),
+            Some(Ok(Async::NotReady)) => (),
+            Some(Ok(Async::Ready(found_srvs))) => {
+                self.lookup = None;
+                match found_srvs {
+                    Some(srvs) =>
+                        self.srvs = Some(srvs.to_stream(self.resolver.clone())),
+                    None =>
+                        return Err("No SRV records".to_owned()),
+                }
+            },
+            Some(Err(e)) =>
+                return Err(format!("{}", e)),
+        }
+
+        match self.srvs.as_mut().map(|mut srv| srv.poll()) {
+            None => (),
+            Some(Ok(Async::NotReady)) => (),
+            Some(Ok(Async::Ready(None))) =>
+                self.srvs = None,
+            Some(Ok(Async::Ready(Some(srv_item)))) => {
+                for addr in srv_item.to_socket_addrs() {
+                    println!("Connect to {}", addr);
+                    let connect = TcpStream::connect(&addr, &self.handle);
+                    self.connects.push(connect);
+                }
+            },
+            Some(Err(e)) =>
+                return Err(format!("{}", e)),
+        }
+
+        for mut connect in &mut self.connects {
+            match connect.poll() {
+                Ok(Async::NotReady) => (),
+                Ok(Async::Ready(tcp_stream)) =>
+                    // Success!
+                    return Ok(Async::Ready(tcp_stream)),
+                Err(e) =>
+                    return Err(format!("{}", e)),
+            }
+        }
+        
+        Ok(Async::NotReady)
+    }
+}

src/lib.rs 🔗

@@ -9,7 +9,7 @@ extern crate tokio_tls;
 extern crate sasl;
 extern crate rustc_serialize as serialize;
 extern crate jid;
-
+extern crate domain;
 
 pub mod xmpp_codec;
 pub mod xmpp_stream;
@@ -18,5 +18,6 @@ mod tcp;
 pub use tcp::*;
 mod starttls;
 pub use starttls::*;
+mod happy_eyeballs;
 mod client;
 pub use client::{Client, ClientEvent};

src/tcp.rs 🔗

@@ -27,6 +27,14 @@ impl TcpClient {
             jid,
         }
     }
+
+    pub fn from_stream(jid: Jid, tcp_stream: TcpStream) -> Self {
+        let start = XMPPStream::from_stream(tcp_stream, jid.clone());
+        TcpClient {
+            state: TcpClientState::Start(start),
+            jid,
+        }
+    }
 }
 
 impl Future for TcpClient {