mod.rs

  1use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
  2use idna;
  3use xmpp_parsers::{Jid, JidParseError};
  4use sasl::common::{ChannelBinding, Credentials};
  5use std::mem::replace;
  6use std::str::FromStr;
  7use tokio::net::TcpStream;
  8use tokio_io::{AsyncRead, AsyncWrite};
  9use tokio_tls::TlsStream;
 10
 11use super::event::Event;
 12use super::happy_eyeballs::Connecter;
 13use super::starttls::{StartTlsClient, NS_XMPP_TLS};
 14use super::xmpp_codec::Packet;
 15use super::xmpp_stream;
 16use super::{Error, ProtocolError};
 17
 18mod auth;
 19use self::auth::ClientAuth;
 20mod bind;
 21use self::bind::ClientBind;
 22
 23/// XMPP client connection and state
 24pub struct Client {
 25    state: ClientState,
 26}
 27
 28type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 29const NS_JABBER_CLIENT: &str = "jabber:client";
 30
 31enum ClientState {
 32    Invalid,
 33    Disconnected,
 34    Connecting(Box<dyn Future<Item = XMPPStream, Error = Error>>),
 35    Connected(XMPPStream),
 36}
 37
 38impl Client {
 39    /// Start a new XMPP client
 40    ///
 41    /// Start polling the returned instance so that it will connect
 42    /// and yield events.
 43    pub fn new(jid: &str, password: &str) -> Result<Self, JidParseError> {
 44        let jid = Jid::from_str(jid)?;
 45        let client = Self::new_with_jid(jid, password);
 46        Ok(client)
 47    }
 48
 49    /// Start a new client given that the JID is already parsed.
 50    pub fn new_with_jid(jid: Jid, password: &str) -> Self {
 51        let password = password.to_owned();
 52        let connect = Self::make_connect(jid, password.clone());
 53        let client = Client {
 54            state: ClientState::Connecting(Box::new(connect)),
 55        };
 56        client
 57    }
 58
 59    fn make_connect(jid: Jid, password: String) -> impl Future<Item = XMPPStream, Error = Error> {
 60        let username = jid.clone().node().unwrap();
 61        let jid1 = jid.clone();
 62        let jid2 = jid.clone();
 63        let password = password;
 64        done(idna::domain_to_ascii(&jid.domain()))
 65            .map_err(|_| Error::Idna)
 66            .and_then(|domain| {
 67                done(Connecter::from_lookup(
 68                    &domain,
 69                    Some("_xmpp-client._tcp"),
 70                    5222,
 71                ))
 72            })
 73            .flatten()
 74            .and_then(move |tcp_stream| {
 75                xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
 76            })
 77            .and_then(|xmpp_stream| {
 78                if Self::can_starttls(&xmpp_stream) {
 79                    Ok(Self::starttls(xmpp_stream))
 80                } else {
 81                    Err(Error::Protocol(ProtocolError::NoTls))
 82                }
 83            })
 84            .flatten()
 85            .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()))
 86            .and_then(
 87                move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten?
 88            )
 89            .and_then(|auth| auth)
 90            .and_then(|xmpp_stream| Self::bind(xmpp_stream))
 91            .and_then(|xmpp_stream| {
 92                // println!("Bound to {}", xmpp_stream.jid);
 93                Ok(xmpp_stream)
 94            })
 95    }
 96
 97    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
 98        stream
 99            .stream_features
100            .get_child("starttls", NS_XMPP_TLS)
101            .is_some()
102    }
103
104    fn starttls<S: AsyncRead + AsyncWrite>(
105        stream: xmpp_stream::XMPPStream<S>,
106    ) -> StartTlsClient<S> {
107        StartTlsClient::from_stream(stream)
108    }
109
110    fn auth<S: AsyncRead + AsyncWrite + 'static>(
111        stream: xmpp_stream::XMPPStream<S>,
112        username: String,
113        password: String,
114    ) -> Result<ClientAuth<S>, Error> {
115        let creds = Credentials::default()
116            .with_username(username)
117            .with_password(password)
118            .with_channel_binding(ChannelBinding::None);
119        ClientAuth::new(stream, creds)
120    }
121
122    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
123        ClientBind::new(stream)
124    }
125
126    /// Get the client's bound JID (the one reported by the XMPP
127    /// server).
128    pub fn bound_jid(&self) -> Option<&Jid> {
129        match self.state {
130            ClientState::Connected(ref stream) => Some(&stream.jid),
131            _ => None,
132        }
133    }
134}
135
136impl Stream for Client {
137    type Item = Event;
138    type Error = Error;
139
140    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
141        let state = replace(&mut self.state, ClientState::Invalid);
142
143        match state {
144            ClientState::Invalid => Err(Error::InvalidState),
145            ClientState::Disconnected => Ok(Async::Ready(None)),
146            ClientState::Connecting(mut connect) => match connect.poll() {
147                Ok(Async::Ready(stream)) => {
148                    let jid = stream.jid.clone();
149                    self.state = ClientState::Connected(stream);
150                    Ok(Async::Ready(Some(Event::Online(jid))))
151                }
152                Ok(Async::NotReady) => {
153                    self.state = ClientState::Connecting(connect);
154                    Ok(Async::NotReady)
155                }
156                Err(e) => Err(e),
157            },
158            ClientState::Connected(mut stream) => {
159                // Poll sink
160                match stream.poll_complete() {
161                    Ok(Async::NotReady) => (),
162                    Ok(Async::Ready(())) => (),
163                    Err(e) => return Err(e)?,
164                };
165
166                // Poll stream
167                match stream.poll() {
168                    Ok(Async::Ready(None)) => {
169                        // EOF
170                        self.state = ClientState::Disconnected;
171                        Ok(Async::Ready(Some(Event::Disconnected)))
172                    }
173                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
174                        // Receive stanza
175                        self.state = ClientState::Connected(stream);
176                        Ok(Async::Ready(Some(Event::Stanza(stanza))))
177                    }
178                    Ok(Async::Ready(Some(Packet::Text(_)))) => {
179                        // Ignore text between stanzas
180                        Ok(Async::NotReady)
181                    }
182                    Ok(Async::Ready(Some(Packet::StreamStart(_)))) => {
183                        // <stream:stream>
184                        Err(ProtocolError::InvalidStreamStart.into())
185                    }
186                    Ok(Async::Ready(Some(Packet::StreamEnd))) => {
187                        // End of stream: </stream:stream>
188                        Ok(Async::Ready(None))
189                    }
190                    Ok(Async::NotReady) => {
191                        // Try again later
192                        self.state = ClientState::Connected(stream);
193                        Ok(Async::NotReady)
194                    }
195                    Err(e) => Err(e)?,
196                }
197            }
198        }
199    }
200}
201
202impl Sink for Client {
203    type SinkItem = Packet;
204    type SinkError = Error;
205
206    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
207        match self.state {
208            ClientState::Connected(ref mut stream) =>
209                Ok(stream.start_send(item)?),
210            _ =>
211                Ok(AsyncSink::NotReady(item)),
212        }
213    }
214
215    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
216        match self.state {
217            ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()),
218            _ => Ok(Async::Ready(())),
219        }
220    }
221
222    /// This closes the inner TCP stream.
223    ///
224    /// To synchronize your shutdown with the server side, you should
225    /// first send `Packet::StreamEnd` and wait for the end of the
226    /// incoming stream before closing the connection.
227    fn close(&mut self) -> Poll<(), Self::SinkError> {
228        match self.state {
229            ClientState::Connected(ref mut stream) =>
230                stream.close()
231                .map_err(|e| e.into()),
232            _ =>
233                Ok(Async::Ready(())),
234        }
235    }
236}