1use std::str::FromStr;
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use futures::{Future, Poll, Async, Stream};
5use tokio_core::reactor::Handle;
6use tokio_core::net::{TcpStream, TcpStreamNew};
7use domain::resolv::Resolver;
8use domain::resolv::lookup::srv::{lookup_srv, LookupSrv, LookupSrvStream};
9use domain::bits::DNameBuf;
10
11pub struct Connecter {
12 handle: Handle,
13 resolver: Resolver,
14 lookup: Option<LookupSrv>,
15 srvs: Option<LookupSrvStream>,
16 connects: HashMap<SocketAddr, TcpStreamNew>,
17}
18
19impl Connecter {
20 pub fn from_lookup(handle: Handle, domain: &str, srv: &str, fallback_port: u16) -> Result<Connecter, String> {
21 let domain = try!(
22 DNameBuf::from_str(domain)
23 .map_err(|e| format!("{}", e))
24 );
25 let srv = try!(
26 DNameBuf::from_str(srv)
27 .map_err(|e| format!("{}", e))
28 );
29
30 let resolver = Resolver::new(&handle);
31 let lookup = lookup_srv(resolver.clone(), srv, domain, fallback_port);
32
33 Ok(Connecter {
34 handle,
35 resolver,
36 lookup: Some(lookup),
37 srvs: None,
38 connects: HashMap::new(),
39 })
40 }
41}
42
43impl Future for Connecter {
44 type Item = TcpStream;
45 type Error = String;
46
47 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
48 match self.lookup.as_mut().map(|mut lookup| lookup.poll()) {
49 None | Some(Ok(Async::NotReady)) => (),
50 Some(Ok(Async::Ready(found_srvs))) => {
51 self.lookup = None;
52 match found_srvs {
53 Some(srvs) =>
54 self.srvs = Some(srvs.to_stream(self.resolver.clone())),
55 None =>
56 return Err("No SRV records".to_owned()),
57 }
58 },
59 Some(Err(e)) =>
60 return Err(format!("{}", e)),
61 }
62
63 match self.srvs.as_mut().map(|mut srv| srv.poll()) {
64 None | Some(Ok(Async::NotReady)) => (),
65 Some(Ok(Async::Ready(None))) =>
66 self.srvs = None,
67 Some(Ok(Async::Ready(Some(srv_item)))) => {
68 let handle = &self.handle;
69 for addr in srv_item.to_socket_addrs() {
70 self.connects.entry(addr)
71 .or_insert_with(|| {
72 println!("Connect to {}", addr);
73 TcpStream::connect(&addr, handle)
74 });
75 }
76 },
77 Some(Err(e)) =>
78 return Err(format!("{}", e)),
79 }
80
81 let mut connected_stream = None;
82 self.connects.retain(|_, connect| {
83 if connected_stream.is_some() {
84 return false;
85 }
86
87 match connect.poll() {
88 Ok(Async::NotReady) => true,
89 Ok(Async::Ready(tcp_stream)) => {
90 // Success!
91 connected_stream = Some(tcp_stream);
92 false
93 },
94 Err(e) => {
95 println!("{}", e);
96 false
97 },
98 }
99 });
100 if let Some(tcp_stream) = connected_stream {
101 return Ok(Async::Ready(tcp_stream));
102 }
103
104 if self.lookup.is_none() &&
105 self.srvs.is_none() &&
106 self.connects.is_empty()
107 {
108 return Err("All connection attempts failed".to_owned());
109 }
110
111 Ok(Async::NotReady)
112 }
113}