1use std::mem::replace;
2use std::str::FromStr;
3use std::error::Error;
4use tokio_core::reactor::Handle;
5use tokio_core::net::TcpStream;
6use tokio_io::{AsyncRead, AsyncWrite};
7use tokio_tls::TlsStream;
8use futures::*;
9use jid::{Jid, JidParseError};
10use xml;
11use sasl::common::{Credentials, ChannelBinding};
12
13use super::xmpp_codec::Packet;
14use super::xmpp_stream;
15use super::tcp::TcpClient;
16use super::starttls::{NS_XMPP_TLS, StartTlsClient};
17
18mod auth;
19use self::auth::*;
20mod bind;
21use self::bind::*;
22
23pub struct Client {
24 pub jid: Jid,
25 state: ClientState,
26}
27
28type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
29
30enum ClientState {
31 Invalid,
32 Disconnected,
33 Connecting(Box<Future<Item=XMPPStream, Error=String>>),
34 Connected(XMPPStream),
35}
36
37impl Client {
38 pub fn new(jid: &str, password: &str, handle: &Handle) -> Result<Self, JidParseError> {
39 let jid = try!(Jid::from_str(jid));
40 let password = password.to_owned();
41 let connect = Self::make_connect(jid.clone(), password.clone(), handle);
42 Ok(Client {
43 jid,
44 state: ClientState::Connecting(connect),
45 })
46 }
47
48 fn make_connect(jid: Jid, password: String, handle: &Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
49 // TODO: implement proper DNS SRV lookup
50 use std::net::ToSocketAddrs;
51 let addr = "89.238.79.220:5222"
52 .to_socket_addrs().unwrap()
53 .next().unwrap();
54 let username = jid.node.as_ref().unwrap().to_owned();
55 let password = password;
56 Box::new(
57 TcpClient::connect(
58 jid,
59 &addr,
60 handle
61 ).map_err(|e| format!("{}", e)
62 ).and_then(|stream| {
63 if Self::can_starttls(&stream) {
64 Self::starttls(stream)
65 } else {
66 panic!("No STARTTLS")
67 }
68 }).and_then(move |stream| {
69 Self::auth(stream, username, password).expect("auth")
70 }).and_then(|stream| {
71 Self::bind(stream)
72 }).and_then(|stream| {
73 println!("Bound to {}", stream.jid);
74 Ok(stream)
75 })
76 )
77 }
78
79 fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
80 stream.stream_features
81 .get_child("starttls", Some(NS_XMPP_TLS))
82 .is_some()
83 }
84
85 fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
86 StartTlsClient::from_stream(stream)
87 }
88
89 fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, String> {
90 let creds = Credentials::default()
91 .with_username(username)
92 .with_password(password)
93 .with_channel_binding(ChannelBinding::None);
94 ClientAuth::new(stream, creds)
95 }
96
97 fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
98 ClientBind::new(stream)
99 }
100}
101
102#[derive(Debug)]
103pub enum ClientEvent {
104 Online,
105 Disconnected,
106 Stanza(xml::Element),
107}
108
109impl Stream for Client {
110 type Item = ClientEvent;
111 type Error = String;
112
113 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
114 let state = replace(&mut self.state, ClientState::Invalid);
115
116 match state {
117 ClientState::Invalid =>
118 Err("invalid client state".to_owned()),
119 ClientState::Disconnected =>
120 Ok(Async::Ready(None)),
121 ClientState::Connecting(mut connect) => {
122 match connect.poll() {
123 Ok(Async::Ready(stream)) => {
124 self.state = ClientState::Connected(stream);
125 Ok(Async::Ready(Some(ClientEvent::Online)))
126 },
127 Ok(Async::NotReady) => {
128 self.state = ClientState::Connecting(connect);
129 Ok(Async::NotReady)
130 },
131 Err(e) =>
132 Err(e),
133 }
134 },
135 ClientState::Connected(mut stream) => {
136 match stream.poll() {
137 Ok(Async::NotReady) => {
138 self.state = ClientState::Connected(stream);
139 Ok(Async::NotReady)
140 },
141 Ok(Async::Ready(None)) => {
142 // EOF
143 self.state = ClientState::Disconnected;
144 Ok(Async::Ready(Some(ClientEvent::Disconnected)))
145 },
146 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
147 self.state = ClientState::Connected(stream);
148 Ok(Async::Ready(Some(ClientEvent::Stanza(stanza))))
149 },
150 Ok(Async::Ready(_)) => {
151 self.state = ClientState::Connected(stream);
152 Ok(Async::NotReady)
153 },
154 Err(e) =>
155 Err(e.description().to_owned()),
156 }
157 },
158 }
159 }
160}
161
162impl Sink for Client {
163 type SinkItem = xml::Element;
164 type SinkError = String;
165
166 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
167 match self.state {
168 ClientState::Connected(ref mut stream) =>
169 match stream.start_send(Packet::Stanza(item)) {
170 Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
171 Ok(AsyncSink::NotReady(stanza)),
172 Ok(AsyncSink::NotReady(_)) =>
173 panic!("Client.start_send with stanza but got something else back"),
174 Ok(AsyncSink::Ready) => {
175 Ok(AsyncSink::Ready)
176 },
177 Err(e) =>
178 Err(e.description().to_owned()),
179 },
180 _ =>
181 Ok(AsyncSink::NotReady(item)),
182 }
183 }
184
185 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
186 match &mut self.state {
187 &mut ClientState::Connected(ref mut stream) =>
188 stream.poll_complete()
189 .map_err(|e| e.description().to_owned()),
190 _ =>
191 Ok(Async::Ready(())),
192 }
193 }
194}