1use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
2use idna;
3use xmpp_parsers::{Jid, JidParseError};
4use sasl::common::{ChannelBinding, Credentials};
5use std::mem::replace;
6use std::str::FromStr;
7use tokio::net::TcpStream;
8use tokio_io::{AsyncRead, AsyncWrite};
9use tokio_tls::TlsStream;
10
11use super::event::Event;
12use super::happy_eyeballs::Connecter;
13use super::starttls::{StartTlsClient, NS_XMPP_TLS};
14use super::xmpp_codec::Packet;
15use super::xmpp_stream;
16use super::{Error, ProtocolError};
17
18mod auth;
19use self::auth::ClientAuth;
20mod bind;
21use self::bind::ClientBind;
22
23/// XMPP client connection and state
24pub struct Client {
25 /// The client's current Jabber-Id
26 pub jid: Jid,
27 state: ClientState,
28}
29
30type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
31const NS_JABBER_CLIENT: &str = "jabber:client";
32
33enum ClientState {
34 Invalid,
35 Disconnected,
36 Connecting(Box<dyn Future<Item = XMPPStream, Error = Error>>),
37 Connected(XMPPStream),
38}
39
40impl Client {
41 /// Start a new XMPP client
42 ///
43 /// Start polling the returned instance so that it will connect
44 /// and yield events.
45 pub fn new(jid: &str, password: &str) -> Result<Self, JidParseError> {
46 let jid = Jid::from_str(jid)?;
47 let client = Self::new_with_jid(jid, password);
48 Ok(client)
49 }
50
51 /// Start a new client given that the JID is already parsed.
52 pub fn new_with_jid(jid: Jid, password: &str) -> Self {
53 let password = password.to_owned();
54 let connect = Self::make_connect(jid.clone(), password.clone());
55 let client = Client {
56 jid,
57 state: ClientState::Connecting(Box::new(connect)),
58 };
59 client
60 }
61
62 fn make_connect(jid: Jid, password: String) -> impl Future<Item = XMPPStream, Error = Error> {
63 let username = jid.node.as_ref().unwrap().to_owned();
64 let jid1 = jid.clone();
65 let jid2 = jid.clone();
66 let password = password;
67 done(idna::domain_to_ascii(&jid.domain))
68 .map_err(|_| Error::Idna)
69 .and_then(|domain| {
70 done(Connecter::from_lookup(
71 &domain,
72 Some("_xmpp-client._tcp"),
73 5222,
74 ))
75 })
76 .flatten()
77 .and_then(move |tcp_stream| {
78 xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
79 })
80 .and_then(|xmpp_stream| {
81 if Self::can_starttls(&xmpp_stream) {
82 Ok(Self::starttls(xmpp_stream))
83 } else {
84 Err(Error::Protocol(ProtocolError::NoTls))
85 }
86 })
87 .flatten()
88 .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()))
89 .and_then(
90 move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten?
91 )
92 .and_then(|auth| auth)
93 .and_then(|xmpp_stream| Self::bind(xmpp_stream))
94 .and_then(|xmpp_stream| {
95 // println!("Bound to {}", xmpp_stream.jid);
96 Ok(xmpp_stream)
97 })
98 }
99
100 fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
101 stream
102 .stream_features
103 .get_child("starttls", NS_XMPP_TLS)
104 .is_some()
105 }
106
107 fn starttls<S: AsyncRead + AsyncWrite>(
108 stream: xmpp_stream::XMPPStream<S>,
109 ) -> StartTlsClient<S> {
110 StartTlsClient::from_stream(stream)
111 }
112
113 fn auth<S: AsyncRead + AsyncWrite + 'static>(
114 stream: xmpp_stream::XMPPStream<S>,
115 username: String,
116 password: String,
117 ) -> Result<ClientAuth<S>, Error> {
118 let creds = Credentials::default()
119 .with_username(username)
120 .with_password(password)
121 .with_channel_binding(ChannelBinding::None);
122 ClientAuth::new(stream, creds)
123 }
124
125 fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
126 ClientBind::new(stream)
127 }
128}
129
130impl Stream for Client {
131 type Item = Event;
132 type Error = Error;
133
134 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
135 let state = replace(&mut self.state, ClientState::Invalid);
136
137 match state {
138 ClientState::Invalid => Err(Error::InvalidState),
139 ClientState::Disconnected => Ok(Async::Ready(None)),
140 ClientState::Connecting(mut connect) => match connect.poll() {
141 Ok(Async::Ready(stream)) => {
142 self.state = ClientState::Connected(stream);
143 Ok(Async::Ready(Some(Event::Online)))
144 }
145 Ok(Async::NotReady) => {
146 self.state = ClientState::Connecting(connect);
147 Ok(Async::NotReady)
148 }
149 Err(e) => Err(e),
150 },
151 ClientState::Connected(mut stream) => {
152 // Poll sink
153 match stream.poll_complete() {
154 Ok(Async::NotReady) => (),
155 Ok(Async::Ready(())) => (),
156 Err(e) => return Err(e)?,
157 };
158
159 // Poll stream
160 match stream.poll() {
161 Ok(Async::Ready(None)) => {
162 // EOF
163 self.state = ClientState::Disconnected;
164 Ok(Async::Ready(Some(Event::Disconnected)))
165 }
166 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
167 // Receive stanza
168 self.state = ClientState::Connected(stream);
169 Ok(Async::Ready(Some(Event::Stanza(stanza))))
170 }
171 Ok(Async::Ready(Some(Packet::Text(_)))) => {
172 // Ignore text between stanzas
173 Ok(Async::NotReady)
174 }
175 Ok(Async::Ready(Some(Packet::StreamStart(_)))) => {
176 // <stream:stream>
177 Err(ProtocolError::InvalidStreamStart.into())
178 }
179 Ok(Async::Ready(Some(Packet::StreamEnd))) => {
180 // End of stream: </stream:stream>
181 Ok(Async::Ready(None))
182 }
183 Ok(Async::NotReady) => {
184 // Try again later
185 self.state = ClientState::Connected(stream);
186 Ok(Async::NotReady)
187 }
188 Err(e) => Err(e)?,
189 }
190 }
191 }
192 }
193}
194
195impl Sink for Client {
196 type SinkItem = Packet;
197 type SinkError = Error;
198
199 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
200 match self.state {
201 ClientState::Connected(ref mut stream) =>
202 Ok(stream.start_send(item)?),
203 _ =>
204 Ok(AsyncSink::NotReady(item)),
205 }
206 }
207
208 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
209 match self.state {
210 ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()),
211 _ => Ok(Async::Ready(())),
212 }
213 }
214
215 /// This closes the inner TCP stream.
216 ///
217 /// To synchronize your shutdown with the server side, you should
218 /// first send `Packet::StreamEnd` and wait for the end of the
219 /// incoming stream before closing the connection.
220 fn close(&mut self) -> Poll<(), Self::SinkError> {
221 match self.state {
222 ClientState::Connected(ref mut stream) =>
223 stream.close()
224 .map_err(|e| e.into()),
225 _ =>
226 Ok(Async::Ready(())),
227 }
228 }
229}