mod.rs

  1use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
  2use idna;
  3use xmpp_parsers::{Jid, JidParseError};
  4use sasl::common::{ChannelBinding, Credentials};
  5use std::mem::replace;
  6use std::str::FromStr;
  7use tokio::net::TcpStream;
  8use tokio_io::{AsyncRead, AsyncWrite};
  9use tokio_tls::TlsStream;
 10
 11use super::event::Event;
 12use super::happy_eyeballs::Connecter;
 13use super::starttls::{StartTlsClient, NS_XMPP_TLS};
 14use super::xmpp_codec::Packet;
 15use super::xmpp_stream;
 16use super::{Error, ProtocolError};
 17
 18mod auth;
 19use self::auth::ClientAuth;
 20mod bind;
 21use self::bind::ClientBind;
 22
 23/// XMPP client connection and state
 24pub struct Client {
 25    /// The client's current Jabber-Id
 26    pub jid: Jid,
 27    state: ClientState,
 28}
 29
 30type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 31const NS_JABBER_CLIENT: &str = "jabber:client";
 32
 33enum ClientState {
 34    Invalid,
 35    Disconnected,
 36    Connecting(Box<dyn Future<Item = XMPPStream, Error = Error>>),
 37    Connected(XMPPStream),
 38}
 39
 40impl Client {
 41    /// Start a new XMPP client
 42    ///
 43    /// Start polling the returned instance so that it will connect
 44    /// and yield events.
 45    pub fn new(jid: &str, password: &str) -> Result<Self, JidParseError> {
 46        let jid = Jid::from_str(jid)?;
 47        let client = Self::new_with_jid(jid, password);
 48        Ok(client)
 49    }
 50
 51    /// Start a new client given that the JID is already parsed.
 52    pub fn new_with_jid(jid: Jid, password: &str) -> Self {
 53        let password = password.to_owned();
 54        let connect = Self::make_connect(jid.clone(), password.clone());
 55        let client = Client {
 56            jid,
 57            state: ClientState::Connecting(Box::new(connect)),
 58        };
 59        client
 60    }
 61
 62    fn make_connect(jid: Jid, password: String) -> impl Future<Item = XMPPStream, Error = Error> {
 63        let username = jid.node.as_ref().unwrap().to_owned();
 64        let jid1 = jid.clone();
 65        let jid2 = jid.clone();
 66        let password = password;
 67        done(idna::domain_to_ascii(&jid.domain))
 68            .map_err(|_| Error::Idna)
 69            .and_then(|domain| {
 70                done(Connecter::from_lookup(
 71                    &domain,
 72                    Some("_xmpp-client._tcp"),
 73                    5222,
 74                ))
 75            })
 76            .flatten()
 77            .and_then(move |tcp_stream| {
 78                xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
 79            })
 80            .and_then(|xmpp_stream| {
 81                if Self::can_starttls(&xmpp_stream) {
 82                    Ok(Self::starttls(xmpp_stream))
 83                } else {
 84                    Err(Error::Protocol(ProtocolError::NoTls))
 85                }
 86            })
 87            .flatten()
 88            .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()))
 89            .and_then(
 90                move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten?
 91            )
 92            .and_then(|auth| auth)
 93            .and_then(|xmpp_stream| Self::bind(xmpp_stream))
 94            .and_then(|xmpp_stream| {
 95                // println!("Bound to {}", xmpp_stream.jid);
 96                Ok(xmpp_stream)
 97            })
 98    }
 99
100    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
101        stream
102            .stream_features
103            .get_child("starttls", NS_XMPP_TLS)
104            .is_some()
105    }
106
107    fn starttls<S: AsyncRead + AsyncWrite>(
108        stream: xmpp_stream::XMPPStream<S>,
109    ) -> StartTlsClient<S> {
110        StartTlsClient::from_stream(stream)
111    }
112
113    fn auth<S: AsyncRead + AsyncWrite + 'static>(
114        stream: xmpp_stream::XMPPStream<S>,
115        username: String,
116        password: String,
117    ) -> Result<ClientAuth<S>, Error> {
118        let creds = Credentials::default()
119            .with_username(username)
120            .with_password(password)
121            .with_channel_binding(ChannelBinding::None);
122        ClientAuth::new(stream, creds)
123    }
124
125    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
126        ClientBind::new(stream)
127    }
128}
129
130impl Stream for Client {
131    type Item = Event;
132    type Error = Error;
133
134    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
135        let state = replace(&mut self.state, ClientState::Invalid);
136
137        match state {
138            ClientState::Invalid => Err(Error::InvalidState),
139            ClientState::Disconnected => Ok(Async::Ready(None)),
140            ClientState::Connecting(mut connect) => match connect.poll() {
141                Ok(Async::Ready(stream)) => {
142                    self.state = ClientState::Connected(stream);
143                    Ok(Async::Ready(Some(Event::Online)))
144                }
145                Ok(Async::NotReady) => {
146                    self.state = ClientState::Connecting(connect);
147                    Ok(Async::NotReady)
148                }
149                Err(e) => Err(e),
150            },
151            ClientState::Connected(mut stream) => {
152                // Poll sink
153                match stream.poll_complete() {
154                    Ok(Async::NotReady) => (),
155                    Ok(Async::Ready(())) => (),
156                    Err(e) => return Err(e)?,
157                };
158
159                // Poll stream
160                match stream.poll() {
161                    Ok(Async::Ready(None)) => {
162                        // EOF
163                        self.state = ClientState::Disconnected;
164                        Ok(Async::Ready(Some(Event::Disconnected)))
165                    }
166                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
167                        // Receive stanza
168                        self.state = ClientState::Connected(stream);
169                        Ok(Async::Ready(Some(Event::Stanza(stanza))))
170                    }
171                    Ok(Async::Ready(Some(Packet::Text(_)))) => {
172                        // Ignore text between stanzas
173                        Ok(Async::NotReady)
174                    }
175                    Ok(Async::Ready(Some(Packet::StreamStart(_)))) => {
176                        // <stream:stream>
177                        Err(ProtocolError::InvalidStreamStart.into())
178                    }
179                    Ok(Async::Ready(Some(Packet::StreamEnd))) => {
180                        // End of stream: </stream:stream>
181                        Ok(Async::Ready(None))
182                    }
183                    Ok(Async::NotReady) => {
184                        // Try again later
185                        self.state = ClientState::Connected(stream);
186                        Ok(Async::NotReady)
187                    }
188                    Err(e) => Err(e)?,
189                }
190            }
191        }
192    }
193}
194
195impl Sink for Client {
196    type SinkItem = Packet;
197    type SinkError = Error;
198
199    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
200        match self.state {
201            ClientState::Connected(ref mut stream) =>
202                Ok(stream.start_send(item)?),
203            _ =>
204                Ok(AsyncSink::NotReady(item)),
205        }
206    }
207
208    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
209        match self.state {
210            ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()),
211            _ => Ok(Async::Ready(())),
212        }
213    }
214
215    /// This closes the inner TCP stream.
216    ///
217    /// To synchronize your shutdown with the server side, you should
218    /// first send `Packet::StreamEnd` and wait for the end of the
219    /// incoming stream before closing the connection.
220    fn close(&mut self) -> Poll<(), Self::SinkError> {
221        match self.state {
222            ClientState::Connected(ref mut stream) =>
223                stream.close()
224                .map_err(|e| e.into()),
225            _ =>
226                Ok(Async::Ready(())),
227        }
228    }
229}