Detailed changes
@@ -12,18 +12,18 @@ keywords = ["xmpp", "tokio"]
[dependencies]
futures = "0.1"
-tokio-core = "0.1"
+tokio = "0.1"
tokio-io = "0.1"
tokio-codec = "0.1"
bytes = "0.4.9"
xml5ever = "0.12"
minidom = "0.9"
-# TODO: update to 0.2.0
-native-tls = "0.1"
-tokio-tls = "0.1"
+native-tls = "0.2"
+tokio-tls = "0.2"
sasl = "0.4"
jid = { version = "0.5", features = ["minidom"] }
-domain = "0.2"
+trust-dns-resolver = "0.9.1"
+trust-dns-proto = "0.4.0"
xmpp-parsers = "0.11"
idna = "0.1"
try_from = "0.2"
@@ -1,5 +1,5 @@
extern crate futures;
-extern crate tokio_core;
+extern crate tokio;
extern crate tokio_xmpp;
extern crate jid;
extern crate minidom;
@@ -9,8 +9,8 @@ extern crate try_from;
use std::env::args;
use std::process::exit;
use try_from::TryFrom;
-use tokio_core::reactor::Core;
use futures::{Stream, Sink, future};
+use tokio::runtime::current_thread::Runtime;
use tokio_xmpp::Client;
use minidom::Element;
use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow};
@@ -27,9 +27,9 @@ fn main() {
let password = &args[2];
// tokio_core context
- let mut core = Core::new().unwrap();
+ let mut rt = Runtime::new().unwrap();
// Client instance
- let client = Client::new(jid, password, core.handle()).unwrap();
+ let client = Client::new(jid, password).unwrap();
// Make the two interfaces for sending and receiving independent
// of each other so we can move one into a closure.
@@ -64,7 +64,7 @@ fn main() {
});
// Start polling `done`
- match core.run(done) {
+ match rt.block_on(done) {
Ok(_) => (),
Err(e) => {
println!("Fatal: {}", e);
@@ -1,5 +1,5 @@
extern crate futures;
-extern crate tokio_core;
+extern crate tokio;
extern crate tokio_xmpp;
extern crate jid;
extern crate minidom;
@@ -10,7 +10,7 @@ use std::env::args;
use std::process::exit;
use std::str::FromStr;
use try_from::TryFrom;
-use tokio_core::reactor::Core;
+use tokio::runtime::current_thread::Runtime;
use futures::{Stream, Sink, future};
use tokio_xmpp::Component;
use minidom::Element;
@@ -30,10 +30,10 @@ fn main() {
let port: u16 = args.get(4).unwrap().parse().unwrap_or(5347u16);
// tokio_core context
- let mut core = Core::new().unwrap();
+ let mut rt = Runtime::new().unwrap();
// Component instance
- println!("{} {} {} {} {:?}", jid, password, server, port, core.handle());
- let component = Component::new(jid, password, server, port, core.handle()).unwrap();
+ println!("{} {} {} {}", jid, password, server, port);
+ let component = Component::new(jid, password, server, port).unwrap();
// Make the two interfaces for sending and receiving independent
// of each other so we can move one into a closure.
@@ -70,7 +70,7 @@ fn main() {
});
// Start polling `done`
- match core.run(done) {
+ match rt.block_on(done) {
Ok(_) => (),
Err(e) => {
println!("Fatal: {}", e);
@@ -1,8 +1,7 @@
use std::mem::replace;
use std::str::FromStr;
use std::error::Error;
-use tokio_core::reactor::Handle;
-use tokio_core::net::TcpStream;
+use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tls::TlsStream;
use futures::{future, Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
@@ -44,17 +43,17 @@ impl Client {
///
/// Start polling the returned instance so that it will connect
/// and yield events.
- pub fn new(jid: &str, password: &str, handle: Handle) -> Result<Self, JidParseError> {
+ pub fn new(jid: &str, password: &str) -> Result<Self, JidParseError> {
let jid = Jid::from_str(jid)?;
let password = password.to_owned();
- let connect = Self::make_connect(jid.clone(), password.clone(), handle);
+ let connect = Self::make_connect(jid.clone(), password.clone());
Ok(Client {
jid,
state: ClientState::Connecting(connect),
})
}
- fn make_connect(jid: Jid, password: String, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
+ fn make_connect(jid: Jid, password: String) -> Box<Future<Item=XMPPStream, Error=String>> {
let username = jid.node.as_ref().unwrap().to_owned();
let jid1 = jid.clone();
let jid2 = jid.clone();
@@ -66,7 +65,7 @@ impl Client {
return Box::new(future::err(format!("{:?}", e))),
};
Box::new(
- Connecter::from_lookup(handle, &domain, "_xmpp-client._tcp", 5222)
+ Connecter::from_lookup(&domain, "_xmpp-client._tcp", 5222)
.expect("Connector::from_lookup")
.and_then(move |tcp_stream|
xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
@@ -4,8 +4,7 @@
use std::mem::replace;
use std::str::FromStr;
use std::error::Error;
-use tokio_core::reactor::Handle;
-use tokio_core::net::TcpStream;
+use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
use minidom::Element;
@@ -41,21 +40,21 @@ impl Component {
///
/// Start polling the returned instance so that it will connect
/// and yield events.
- pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result<Self, JidParseError> {
+ pub fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, JidParseError> {
let jid = Jid::from_str(jid)?;
let password = password.to_owned();
- let connect = Self::make_connect(jid.clone(), password, server, port, handle);
+ let connect = Self::make_connect(jid.clone(), password, server, port);
Ok(Component {
jid,
state: ComponentState::Connecting(connect),
})
}
- fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
+ fn make_connect(jid: Jid, password: String, server: &str, port: u16) -> Box<Future<Item=XMPPStream, Error=String>> {
let jid1 = jid.clone();
let password = password;
Box::new(
- Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port)
+ Connecter::from_lookup(server, "_xmpp-component._tcp", port)
.expect("Connector::from_lookup")
.and_then(move |tcp_stream| {
xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
@@ -1,37 +1,46 @@
-use std::str::FromStr;
-use std::collections::HashMap;
-use std::net::SocketAddr;
-use futures::{Future, Poll, Async, Stream};
-use tokio_core::reactor::Handle;
-use tokio_core::net::{TcpStream, TcpStreamNew};
-use domain::resolv::Resolver;
-use domain::resolv::lookup::srv::{lookup_srv, LookupSrv, LookupSrvStream};
-use domain::bits::DNameBuf;
+use std::mem;
+use std::net::{SocketAddr, IpAddr};
+use std::collections::{BTreeMap, btree_map};
+use std::collections::VecDeque;
+use futures::{Future, Poll, Async};
+use tokio::net::{ConnectFuture, TcpStream};
+use trust_dns_resolver::{IntoName, Name, ResolverFuture, error::ResolveError};
+use trust_dns_resolver::lookup::SrvLookupFuture;
+use trust_dns_resolver::lookup_ip::LookupIpFuture;
+use trust_dns_proto::rr::rdata::srv::SRV;
pub struct Connecter {
- handle: Handle,
- resolver: Resolver,
- lookup: Option<LookupSrv>,
- srvs: Option<LookupSrvStream>,
- connects: HashMap<SocketAddr, TcpStreamNew>,
+ fallback_port: u16,
+ name: Name,
+ domain: Name,
+ resolver_future: Box<Future<Item = ResolverFuture, Error = ResolveError> + Send>,
+ resolver_opt: Option<ResolverFuture>,
+ srv_lookup_opt: Option<SrvLookupFuture>,
+ srvs_opt: Option<btree_map::IntoIter<u16, SRV>>,
+ ip_lookup_opt: Option<(u16, LookupIpFuture)>,
+ ips_opt: Option<(u16, VecDeque<IpAddr>)>,
+ connect_opt: Option<ConnectFuture>,
}
impl Connecter {
- pub fn from_lookup(handle: Handle, domain: &str, srv: &str, fallback_port: u16) -> Result<Connecter, String> {
- let domain = DNameBuf::from_str(domain)
- .map_err(|e| format!("{}", e))?;
- let srv = DNameBuf::from_str(srv)
- .map_err(|e| format!("{}", e))?;
+ pub fn from_lookup(domain: &str, srv: &str, fallback_port: u16) -> Result<Connecter, String> {
+ let resolver_future = ResolverFuture::from_system_conf()
+ .map_err(|e| format!("Configure resolver: {:?}", e))?;
- let resolver = Resolver::new(&handle);
- let lookup = lookup_srv(resolver.clone(), srv, domain, fallback_port);
+ let name = format!("{}.{}.", srv, domain).into_name()
+ .map_err(|e| format!("Parse service name: {:?}", e))?;
Ok(Connecter {
- handle,
- resolver,
- lookup: Some(lookup),
- srvs: None,
- connects: HashMap::new(),
+ fallback_port,
+ name,
+ domain: domain.into_name().map_err(|e| format!("Parse domain name: {:?}", e))?,
+ resolver_future,
+ resolver_opt: None,
+ srv_lookup_opt: None,
+ srvs_opt: None,
+ ip_lookup_opt: None,
+ ips_opt: None,
+ connect_opt: None,
})
}
}
@@ -41,69 +50,104 @@ impl Future for Connecter {
type Error = String;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- match self.lookup.as_mut().map(|lookup| lookup.poll()) {
- None | Some(Ok(Async::NotReady)) => (),
- Some(Ok(Async::Ready(found_srvs))) => {
- self.lookup = None;
- match found_srvs {
- Some(srvs) =>
- self.srvs = Some(srvs.to_stream(self.resolver.clone())),
- None =>
- return Err("No SRV records".to_owned()),
+ if self.resolver_opt.is_none() {
+ //println!("Poll resolver future");
+ match self.resolver_future.poll() {
+ Ok(Async::Ready(resolver)) => {
+ self.resolver_opt = Some(resolver);
}
- },
- Some(Err(e)) =>
- return Err(format!("{}", e)),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => return Err(format!("Cann't get resolver: {:?}", e)),
+ }
}
- match self.srvs.as_mut().map(|srv| srv.poll()) {
- None | Some(Ok(Async::NotReady)) => (),
- Some(Ok(Async::Ready(None))) =>
- self.srvs = None,
- Some(Ok(Async::Ready(Some(srv_item)))) => {
- let handle = &self.handle;
- for addr in srv_item.to_socket_addrs() {
- self.connects.entry(addr)
- .or_insert_with(|| {
- // println!("Connect to {}", addr);
- TcpStream::connect(&addr, handle)
- });
+ if let Some(ref resolver) = self.resolver_opt {
+ if self.srvs_opt.is_none() {
+ if self.srv_lookup_opt.is_none() {
+ //println!("Lookup srv: {:?}", self.name);
+ self.srv_lookup_opt = Some(resolver.lookup_srv(&self.name));
}
- },
- Some(Err(e)) =>
- return Err(format!("{}", e)),
- }
- let mut connected_stream = None;
- self.connects.retain(|_, connect| {
- if connected_stream.is_some() {
- return false;
+ if let Some(ref mut srv_lookup) = self.srv_lookup_opt {
+ match srv_lookup.poll() {
+ Ok(Async::Ready(t)) => {
+ let mut srvs = BTreeMap::new();
+ for srv in t.iter() {
+ srvs.insert(srv.priority(), srv.clone());
+ }
+ srvs.insert(65535, SRV::new(65535, 0, self.fallback_port, self.domain.clone()));
+ self.srvs_opt = Some(srvs.into_iter());
+ }
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(_) => {
+ //println!("Ignore SVR error: {:?}", e);
+ let mut srvs = BTreeMap::new();
+ srvs.insert(65535, SRV::new(65535, 0, self.fallback_port, self.domain.clone()));
+ self.srvs_opt = Some(srvs.into_iter());
+ },
+ }
+ }
+ }
+
+ if self.connect_opt.is_none() {
+ if self.ips_opt.is_none() {
+ if self.ip_lookup_opt.is_none() {
+ if let Some(ref mut srvs) = self.srvs_opt {
+ if let Some((_, srv)) = srvs.next() {
+ //println!("Lookup ip: {:?}", srv);
+ self.ip_lookup_opt = Some((srv.port(), resolver.lookup_ip(srv.target())));
+ } else {
+ return Err("Cann't connect".to_string());
+ }
+ }
+ }
+
+ if let Some((port, mut ip_lookup)) = mem::replace(&mut self.ip_lookup_opt, None) {
+ match ip_lookup.poll() {
+ Ok(Async::Ready(t)) => {
+ let mut ip_deque = VecDeque::new();
+ ip_deque.extend(t.iter());
+ //println!("IPs: {:?}", ip_deque);
+ self.ips_opt = Some((port, ip_deque));
+ self.ip_lookup_opt = None;
+ },
+ Ok(Async::NotReady) => {
+ self.ip_lookup_opt = Some((port, ip_lookup));
+ return Ok(Async::NotReady)
+ },
+ Err(_) => {
+ //println!("Ignore lookup error: {:?}", e);
+ self.ip_lookup_opt = None;
+ }
+ }
+ }
+ }
+
+ if let Some((port, mut ip_deque)) = mem::replace(&mut self.ips_opt, None) {
+ if let Some(ip) = ip_deque.pop_front() {
+ //println!("Connect to {:?}:{}", ip, port);
+ self.connect_opt = Some(TcpStream::connect(&SocketAddr::new(ip, port)));
+ self.ips_opt = Some((port, ip_deque));
+ }
+ }
}
- match connect.poll() {
- Ok(Async::NotReady) => true,
- Ok(Async::Ready(tcp_stream)) => {
- // Success!
- connected_stream = Some(tcp_stream);
- false
- },
- Err(_e) => {
- // println!("{}", _e);
- false
- },
+ if let Some(mut connect_future) = mem::replace(&mut self.connect_opt, None) {
+ match connect_future.poll() {
+ Ok(Async::Ready(t)) => return Ok(Async::Ready(t)),
+ Ok(Async::NotReady) => {
+ self.connect_opt = Some(connect_future);
+ return Ok(Async::NotReady)
+ }
+ Err(_) => {
+ //println!("Ignore connect error: {:?}", e);
+ },
+ }
}
- });
- if let Some(tcp_stream) = connected_stream {
- return Ok(Async::Ready(tcp_stream));
- }
- if self.lookup.is_none() &&
- self.srvs.is_none() &&
- self.connects.is_empty()
- {
- return Err("All connection attempts failed".to_owned());
}
Ok(Async::NotReady)
}
}
+
@@ -3,7 +3,7 @@
//! XMPP implemeentation with asynchronous I/O using Tokio.
extern crate futures;
-extern crate tokio_core;
+extern crate tokio;
extern crate tokio_io;
extern crate tokio_codec;
extern crate bytes;
@@ -14,7 +14,8 @@ extern crate native_tls;
extern crate tokio_tls;
extern crate sasl;
extern crate jid;
-extern crate domain;
+extern crate trust_dns_resolver;
+extern crate trust_dns_proto;
extern crate idna;
extern crate xmpp_parsers;
extern crate try_from;
@@ -3,8 +3,8 @@ use futures::{Future, Sink, Poll, Async};
use futures::stream::Stream;
use futures::sink;
use tokio_io::{AsyncRead, AsyncWrite};
-use tokio_tls::{TlsStream, TlsConnectorExt, ConnectAsync};
-use native_tls::TlsConnector;
+use tokio_tls::{TlsStream, TlsConnector, Connect};
+use native_tls::TlsConnector as NativeTlsConnector;
use minidom::Element;
use jid::Jid;
@@ -25,7 +25,7 @@ enum StartTlsClientState<S: AsyncRead + AsyncWrite> {
Invalid,
SendStartTls(sink::Send<XMPPStream<S>>),
AwaitProceed(XMPPStream<S>),
- StartingTls(ConnectAsync<S>),
+ StartingTls(Connect<S>),
}
impl<S: AsyncRead + AsyncWrite> StartTlsClient<S> {
@@ -53,7 +53,7 @@ impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let old_state = replace(&mut self.state, StartTlsClientState::Invalid);
let mut retry = false;
-
+
let (new_state, result) = match old_state {
StartTlsClientState::SendStartTls(mut send) =>
match send.poll() {
@@ -73,9 +73,9 @@ impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
if stanza.name() == "proceed" =>
{
let stream = xmpp_stream.stream.into_inner();
- let connect = TlsConnector::builder().unwrap()
- .build().unwrap()
- .connect_async(&self.jid.domain, stream);
+ let connect = TlsConnector::from(NativeTlsConnector::builder()
+ .build().unwrap())
+ .connect(&self.jid.domain, stream);
let new_state = StartTlsClientState::StartingTls(connect);
retry = true;
(new_state, Ok(Async::NotReady))