1use std::mem::replace;
2use std::str::FromStr;
3use std::error::Error;
4use tokio_core::reactor::{Core, Handle};
5use tokio_core::net::TcpStream;
6use tokio_io::{AsyncRead, AsyncWrite};
7use tokio_tls::TlsStream;
8use futures::*;
9use jid::{Jid, JidParseError};
10use xml;
11use sasl::common::{Credentials, ChannelBinding};
12
13use super::xmpp_codec::Packet;
14use super::xmpp_stream;
15use super::tcp::TcpClient;
16use super::starttls::{NS_XMPP_TLS, StartTlsClient};
17
18mod auth;
19use self::auth::*;
20mod bind;
21use self::bind::*;
22
23pub struct Client {
24 pub jid: Jid,
25 password: String,
26 state: ClientState,
27}
28
29type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
30
31enum ClientState {
32 Invalid,
33 Disconnected,
34 Connecting(Box<Future<Item=XMPPStream, Error=String>>),
35 Connected(XMPPStream),
36 // Sending,
37 // Drain,
38}
39
40impl Client {
41 pub fn new(jid: &str, password: &str, handle: &Handle) -> Result<Self, JidParseError> {
42 let jid = try!(Jid::from_str(jid));
43 let password = password.to_owned();
44 let connect = Self::make_connect(jid.clone(), password.clone(), handle);
45 Ok(Client {
46 jid, password,
47 state: ClientState::Connecting(connect),
48 })
49 }
50
51 fn make_connect(jid: Jid, password: String, handle: &Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
52 use std::net::ToSocketAddrs;
53 let addr = "89.238.79.220:5222"
54 .to_socket_addrs().unwrap()
55 .next().unwrap();
56 let username = jid.node.as_ref().unwrap().to_owned();
57 let password = password;
58 Box::new(
59 TcpClient::connect(
60 jid,
61 &addr,
62 handle
63 ).map_err(|e| format!("{}", e)
64 ).and_then(|stream| {
65 if Self::can_starttls(&stream) {
66 Self::starttls(stream)
67 } else {
68 panic!("No STARTTLS")
69 }
70 }).and_then(move |stream| {
71 Self::auth(stream, username, password).expect("auth")
72 }).and_then(|stream| {
73 Self::bind(stream)
74 }).and_then(|stream| {
75 println!("Bound to {}", stream.jid);
76
77 let presence = xml::Element::new("presence".to_owned(), None, vec![]);
78 stream.send(Packet::Stanza(presence))
79 .map_err(|e| format!("{}", e))
80 })
81 )
82 }
83
84 fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
85 stream.stream_features
86 .get_child("starttls", Some(NS_XMPP_TLS))
87 .is_some()
88 }
89
90 fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
91 StartTlsClient::from_stream(stream)
92 }
93
94 fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, String> {
95 let creds = Credentials::default()
96 .with_username(username)
97 .with_password(password)
98 .with_channel_binding(ChannelBinding::None);
99 ClientAuth::new(stream, creds)
100 }
101
102 fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
103 ClientBind::new(stream)
104 }
105}
106
107#[derive(Debug)]
108pub enum ClientEvent {
109 Online,
110 Disconnected,
111 Stanza(xml::Element),
112}
113
114impl Stream for Client {
115 type Item = ClientEvent;
116 type Error = String;
117
118 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
119 println!("stream.poll");
120 let state = replace(&mut self.state, ClientState::Invalid);
121
122 match state {
123 ClientState::Invalid =>
124 Err("invalid client state".to_owned()),
125 ClientState::Disconnected =>
126 Ok(Async::NotReady),
127 ClientState::Connecting(mut connect) => {
128 match connect.poll() {
129 Ok(Async::Ready(stream)) => {
130 println!("connected");
131 self.state = ClientState::Connected(stream);
132 self.poll()
133 },
134 Ok(Async::NotReady) => {
135 self.state = ClientState::Connecting(connect);
136 Ok(Async::NotReady)
137 },
138 Err(e) =>
139 Err(e),
140 }
141 },
142 ClientState::Connected(mut stream) => {
143 match stream.poll() {
144 Ok(Async::NotReady) => {
145 self.state = ClientState::Connected(stream);
146 Ok(Async::NotReady)
147 },
148 Ok(Async::Ready(None)) => {
149 // EOF
150 self.state = ClientState::Disconnected;
151 Ok(Async::Ready(Some(ClientEvent::Disconnected)))
152 },
153 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
154 self.state = ClientState::Connected(stream);
155 Ok(Async::Ready(Some(ClientEvent::Stanza(stanza))))
156 },
157 Ok(Async::Ready(_)) => {
158 self.state = ClientState::Connected(stream);
159 Ok(Async::NotReady)
160 },
161 Err(e) =>
162 Err(e.description().to_owned()),
163 }
164 },
165 }
166 }
167}