1use std::mem::replace;
2use std::str::FromStr;
3use std::error::Error;
4use tokio_core::reactor::Handle;
5use tokio_core::net::TcpStream;
6use tokio_io::{AsyncRead, AsyncWrite};
7use tokio_tls::TlsStream;
8use futures::*;
9use jid::{Jid, JidParseError};
10use xml;
11use sasl::common::{Credentials, ChannelBinding};
12
13use super::xmpp_codec::Packet;
14use super::xmpp_stream;
15use super::tcp::TcpClient;
16use super::starttls::{NS_XMPP_TLS, StartTlsClient};
17
18mod auth;
19use self::auth::*;
20mod bind;
21use self::bind::*;
22mod event;
23pub use self::event::Event as ClientEvent;
24
25pub struct Client {
26 pub jid: Jid,
27 state: ClientState,
28}
29
30type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
31
32enum ClientState {
33 Invalid,
34 Disconnected,
35 Connecting(Box<Future<Item=XMPPStream, Error=String>>),
36 Connected(XMPPStream),
37}
38
39impl Client {
40 pub fn new(jid: &str, password: &str, handle: &Handle) -> Result<Self, JidParseError> {
41 let jid = try!(Jid::from_str(jid));
42 let password = password.to_owned();
43 let connect = Self::make_connect(jid.clone(), password.clone(), handle);
44 Ok(Client {
45 jid,
46 state: ClientState::Connecting(connect),
47 })
48 }
49
50 fn make_connect(jid: Jid, password: String, handle: &Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
51 // TODO: implement proper DNS SRV lookup
52 use std::net::ToSocketAddrs;
53 let addr = "89.238.79.220:5222"
54 .to_socket_addrs().unwrap()
55 .next().unwrap();
56 let username = jid.node.as_ref().unwrap().to_owned();
57 let password = password;
58 Box::new(
59 TcpClient::connect(
60 jid,
61 &addr,
62 handle
63 ).map_err(|e| format!("{}", e)
64 ).and_then(|stream| {
65 if Self::can_starttls(&stream) {
66 Self::starttls(stream)
67 } else {
68 panic!("No STARTTLS")
69 }
70 }).and_then(move |stream| {
71 Self::auth(stream, username, password).expect("auth")
72 }).and_then(|stream| {
73 Self::bind(stream)
74 }).and_then(|stream| {
75 println!("Bound to {}", stream.jid);
76 Ok(stream)
77 })
78 )
79 }
80
81 fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
82 stream.stream_features
83 .get_child("starttls", Some(NS_XMPP_TLS))
84 .is_some()
85 }
86
87 fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
88 StartTlsClient::from_stream(stream)
89 }
90
91 fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, String> {
92 let creds = Credentials::default()
93 .with_username(username)
94 .with_password(password)
95 .with_channel_binding(ChannelBinding::None);
96 ClientAuth::new(stream, creds)
97 }
98
99 fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
100 ClientBind::new(stream)
101 }
102}
103
104impl Stream for Client {
105 type Item = ClientEvent;
106 type Error = String;
107
108 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
109 let state = replace(&mut self.state, ClientState::Invalid);
110
111 match state {
112 ClientState::Invalid =>
113 Err("invalid client state".to_owned()),
114 ClientState::Disconnected =>
115 Ok(Async::Ready(None)),
116 ClientState::Connecting(mut connect) => {
117 match connect.poll() {
118 Ok(Async::Ready(stream)) => {
119 self.state = ClientState::Connected(stream);
120 Ok(Async::Ready(Some(ClientEvent::Online)))
121 },
122 Ok(Async::NotReady) => {
123 self.state = ClientState::Connecting(connect);
124 Ok(Async::NotReady)
125 },
126 Err(e) =>
127 Err(e),
128 }
129 },
130 ClientState::Connected(mut stream) => {
131 match stream.poll() {
132 Ok(Async::NotReady) => {
133 self.state = ClientState::Connected(stream);
134 Ok(Async::NotReady)
135 },
136 Ok(Async::Ready(None)) => {
137 // EOF
138 self.state = ClientState::Disconnected;
139 Ok(Async::Ready(Some(ClientEvent::Disconnected)))
140 },
141 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
142 self.state = ClientState::Connected(stream);
143 Ok(Async::Ready(Some(ClientEvent::Stanza(stanza))))
144 },
145 Ok(Async::Ready(_)) => {
146 self.state = ClientState::Connected(stream);
147 Ok(Async::NotReady)
148 },
149 Err(e) =>
150 Err(e.description().to_owned()),
151 }
152 },
153 }
154 }
155}
156
157impl Sink for Client {
158 type SinkItem = xml::Element;
159 type SinkError = String;
160
161 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
162 match self.state {
163 ClientState::Connected(ref mut stream) =>
164 match stream.start_send(Packet::Stanza(item)) {
165 Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
166 Ok(AsyncSink::NotReady(stanza)),
167 Ok(AsyncSink::NotReady(_)) =>
168 panic!("Client.start_send with stanza but got something else back"),
169 Ok(AsyncSink::Ready) => {
170 Ok(AsyncSink::Ready)
171 },
172 Err(e) =>
173 Err(e.description().to_owned()),
174 },
175 _ =>
176 Ok(AsyncSink::NotReady(item)),
177 }
178 }
179
180 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
181 match &mut self.state {
182 &mut ClientState::Connected(ref mut stream) =>
183 stream.poll_complete()
184 .map_err(|e| e.description().to_owned()),
185 _ =>
186 Ok(Async::Ready(())),
187 }
188 }
189}