happy_eyeballs.rs

  1use std::str::FromStr;
  2use std::collections::HashMap;
  3use std::net::SocketAddr;
  4use futures::{Future, Poll, Async, Stream};
  5use tokio_core::reactor::Handle;
  6use tokio_core::net::{TcpStream, TcpStreamNew};
  7use domain::resolv::Resolver;
  8use domain::resolv::lookup::srv::{lookup_srv, LookupSrv, LookupSrvStream};
  9use domain::bits::DNameBuf;
 10
 11pub struct Connecter {
 12    handle: Handle,
 13    resolver: Resolver,
 14    lookup: Option<LookupSrv>,
 15    srvs: Option<LookupSrvStream>,
 16    connects: HashMap<SocketAddr, TcpStreamNew>,
 17}
 18
 19impl Connecter {
 20    pub fn from_lookup(handle: Handle, domain: &str, srv: &str, fallback_port: u16) -> Result<Connecter, String> {
 21        let domain = try!(
 22            DNameBuf::from_str(domain)
 23                .map_err(|e| format!("{}", e))
 24        );
 25        let srv = try!(
 26            DNameBuf::from_str(srv)
 27                .map_err(|e| format!("{}", e))
 28        );
 29
 30        let resolver = Resolver::new(&handle);
 31        let lookup = lookup_srv(resolver.clone(), srv, domain, fallback_port);
 32
 33        Ok(Connecter {
 34            handle,
 35            resolver,
 36            lookup: Some(lookup),
 37            srvs: None,
 38            connects: HashMap::new(),
 39        })
 40    }
 41}
 42
 43impl Future for Connecter {
 44    type Item = TcpStream;
 45    type Error = String;
 46
 47    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 48        match self.lookup.as_mut().map(|mut lookup| lookup.poll()) {
 49            None | Some(Ok(Async::NotReady)) => (),
 50            Some(Ok(Async::Ready(found_srvs))) => {
 51                self.lookup = None;
 52                match found_srvs {
 53                    Some(srvs) =>
 54                        self.srvs = Some(srvs.to_stream(self.resolver.clone())),
 55                    None =>
 56                        return Err("No SRV records".to_owned()),
 57                }
 58            },
 59            Some(Err(e)) =>
 60                return Err(format!("{}", e)),
 61        }
 62
 63        match self.srvs.as_mut().map(|mut srv| srv.poll()) {
 64            None | Some(Ok(Async::NotReady)) => (),
 65            Some(Ok(Async::Ready(None))) =>
 66                self.srvs = None,
 67            Some(Ok(Async::Ready(Some(srv_item)))) => {
 68                let handle = &self.handle;
 69                for addr in srv_item.to_socket_addrs() {
 70                    self.connects.entry(addr)
 71                        .or_insert_with(|| {
 72                            println!("Connect to {}", addr);
 73                            TcpStream::connect(&addr, handle)
 74                        });
 75                }
 76            },
 77            Some(Err(e)) =>
 78                return Err(format!("{}", e)),
 79        }
 80
 81        let mut connected_stream = None;
 82        self.connects.retain(|_, connect| {
 83            if connected_stream.is_some() {
 84                return false;
 85            }
 86
 87            match connect.poll() {
 88                Ok(Async::NotReady) => true,
 89                Ok(Async::Ready(tcp_stream)) => {
 90                    // Success!
 91                    connected_stream = Some(tcp_stream);
 92                    false
 93                },
 94                Err(e) => {
 95                    println!("{}", e);
 96                    false
 97                },
 98            }
 99        });
100        if let Some(tcp_stream) = connected_stream {
101            return Ok(Async::Ready(tcp_stream));
102        }
103
104        if  self.lookup.is_none() &&
105            self.srvs.is_none() &&
106            self.connects.is_empty()
107        {
108            return Err("All connection attempts failed".to_owned());
109        }
110
111        Ok(Async::NotReady)
112    }
113}