1use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
2use idna;
3use jid::{Jid, JidParseError};
4use minidom::Element;
5use sasl::common::{ChannelBinding, Credentials};
6use std::mem::replace;
7use std::str::FromStr;
8use tokio::net::TcpStream;
9use tokio_io::{AsyncRead, AsyncWrite};
10use tokio_tls::TlsStream;
11
12use super::event::Event;
13use super::happy_eyeballs::Connecter;
14use super::starttls::{StartTlsClient, NS_XMPP_TLS};
15use super::xmpp_codec::Packet;
16use super::xmpp_stream;
17use super::{Error, ProtocolError};
18
19mod auth;
20use self::auth::ClientAuth;
21mod bind;
22use self::bind::ClientBind;
23
24/// XMPP client connection and state
25pub struct Client {
26 /// The client's current Jabber-Id
27 pub jid: Jid,
28 state: ClientState,
29}
30
31type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
32const NS_JABBER_CLIENT: &str = "jabber:client";
33
34enum ClientState {
35 Invalid,
36 Disconnected,
37 Connecting(Box<Future<Item = XMPPStream, Error = Error>>),
38 Connected(XMPPStream),
39}
40
41impl Client {
42 /// Start a new XMPP client
43 ///
44 /// Start polling the returned instance so that it will connect
45 /// and yield events.
46 pub fn new(jid: &str, password: &str) -> Result<Self, JidParseError> {
47 let jid = Jid::from_str(jid)?;
48 let client = Self::new_with_jid(jid, password);
49 Ok(client)
50 }
51
52 /// Start a new client given that the JID is already parsed.
53 pub fn new_with_jid(jid: Jid, password: &str) -> Self {
54 let password = password.to_owned();
55 let connect = Self::make_connect(jid.clone(), password.clone());
56 let client = Client {
57 jid,
58 state: ClientState::Connecting(Box::new(connect)),
59 };
60 client
61 }
62
63 fn make_connect(jid: Jid, password: String) -> impl Future<Item = XMPPStream, Error = Error> {
64 let username = jid.node.as_ref().unwrap().to_owned();
65 let jid1 = jid.clone();
66 let jid2 = jid.clone();
67 let password = password;
68 done(idna::domain_to_ascii(&jid.domain))
69 .map_err(|_| Error::Idna)
70 .and_then(|domain| {
71 done(Connecter::from_lookup(
72 &domain,
73 Some("_xmpp-client._tcp"),
74 5222,
75 ))
76 })
77 .flatten()
78 .and_then(move |tcp_stream| {
79 xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
80 })
81 .and_then(|xmpp_stream| {
82 if Self::can_starttls(&xmpp_stream) {
83 Ok(Self::starttls(xmpp_stream))
84 } else {
85 Err(Error::Protocol(ProtocolError::NoTls))
86 }
87 })
88 .flatten()
89 .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()))
90 .and_then(
91 move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten?
92 )
93 .and_then(|auth| auth)
94 .and_then(|xmpp_stream| Self::bind(xmpp_stream))
95 .and_then(|xmpp_stream| {
96 // println!("Bound to {}", xmpp_stream.jid);
97 Ok(xmpp_stream)
98 })
99 }
100
101 fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
102 stream
103 .stream_features
104 .get_child("starttls", NS_XMPP_TLS)
105 .is_some()
106 }
107
108 fn starttls<S: AsyncRead + AsyncWrite>(
109 stream: xmpp_stream::XMPPStream<S>,
110 ) -> StartTlsClient<S> {
111 StartTlsClient::from_stream(stream)
112 }
113
114 fn auth<S: AsyncRead + AsyncWrite + 'static>(
115 stream: xmpp_stream::XMPPStream<S>,
116 username: String,
117 password: String,
118 ) -> Result<ClientAuth<S>, Error> {
119 let creds = Credentials::default()
120 .with_username(username)
121 .with_password(password)
122 .with_channel_binding(ChannelBinding::None);
123 ClientAuth::new(stream, creds)
124 }
125
126 fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
127 ClientBind::new(stream)
128 }
129}
130
131impl Stream for Client {
132 type Item = Event;
133 type Error = Error;
134
135 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
136 let state = replace(&mut self.state, ClientState::Invalid);
137
138 match state {
139 ClientState::Invalid => Err(Error::InvalidState),
140 ClientState::Disconnected => Ok(Async::Ready(None)),
141 ClientState::Connecting(mut connect) => match connect.poll() {
142 Ok(Async::Ready(stream)) => {
143 self.state = ClientState::Connected(stream);
144 Ok(Async::Ready(Some(Event::Online)))
145 }
146 Ok(Async::NotReady) => {
147 self.state = ClientState::Connecting(connect);
148 Ok(Async::NotReady)
149 }
150 Err(e) => Err(e),
151 },
152 ClientState::Connected(mut stream) => {
153 // Poll sink
154 match stream.poll_complete() {
155 Ok(Async::NotReady) => (),
156 Ok(Async::Ready(())) => (),
157 Err(e) => return Err(e)?,
158 };
159
160 // Poll stream
161 match stream.poll() {
162 Ok(Async::Ready(None)) => {
163 // EOF
164 self.state = ClientState::Disconnected;
165 Ok(Async::Ready(Some(Event::Disconnected)))
166 }
167 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
168 self.state = ClientState::Connected(stream);
169 Ok(Async::Ready(Some(Event::Stanza(stanza))))
170 }
171 Ok(Async::NotReady) | Ok(Async::Ready(_)) => {
172 self.state = ClientState::Connected(stream);
173 Ok(Async::NotReady)
174 }
175 Err(e) => Err(e)?,
176 }
177 }
178 }
179 }
180}
181
182impl Sink for Client {
183 type SinkItem = Element;
184 type SinkError = Error;
185
186 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
187 match self.state {
188 ClientState::Connected(ref mut stream) => match stream.start_send(Packet::Stanza(item))
189 {
190 Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)),
191 Ok(AsyncSink::NotReady(_)) => {
192 panic!("Client.start_send with stanza but got something else back")
193 }
194 Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
195 Err(e) => Err(e)?,
196 },
197 _ => Ok(AsyncSink::NotReady(item)),
198 }
199 }
200
201 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
202 match self.state {
203 ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()),
204 _ => Ok(Async::Ready(())),
205 }
206 }
207}