mod.rs

  1use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
  2use idna;
  3use xmpp_parsers::{Jid, JidParseError, Element};
  4use sasl::common::{ChannelBinding, Credentials};
  5use std::mem::replace;
  6use std::str::FromStr;
  7use tokio::net::TcpStream;
  8use tokio_io::{AsyncRead, AsyncWrite};
  9use tokio_tls::TlsStream;
 10
 11use super::event::Event;
 12use super::happy_eyeballs::Connecter;
 13use super::starttls::{StartTlsClient, NS_XMPP_TLS};
 14use super::xmpp_codec::Packet;
 15use super::xmpp_stream;
 16use super::{Error, ProtocolError};
 17
 18mod auth;
 19use self::auth::ClientAuth;
 20mod bind;
 21use self::bind::ClientBind;
 22
 23/// XMPP client connection and state
 24pub struct Client {
 25    /// The client's current Jabber-Id
 26    pub jid: Jid,
 27    state: ClientState,
 28}
 29
 30type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 31const NS_JABBER_CLIENT: &str = "jabber:client";
 32
 33enum ClientState {
 34    Invalid,
 35    Disconnected,
 36    Connecting(Box<Future<Item = XMPPStream, Error = Error>>),
 37    Connected(XMPPStream),
 38}
 39
 40impl Client {
 41    /// Start a new XMPP client
 42    ///
 43    /// Start polling the returned instance so that it will connect
 44    /// and yield events.
 45    pub fn new(jid: &str, password: &str) -> Result<Self, JidParseError> {
 46        let jid = Jid::from_str(jid)?;
 47        let client = Self::new_with_jid(jid, password);
 48        Ok(client)
 49    }
 50
 51    /// Start a new client given that the JID is already parsed.
 52    pub fn new_with_jid(jid: Jid, password: &str) -> Self {
 53        let password = password.to_owned();
 54        let connect = Self::make_connect(jid.clone(), password.clone());
 55        let client = Client {
 56            jid,
 57            state: ClientState::Connecting(Box::new(connect)),
 58        };
 59        client
 60    }
 61
 62    fn make_connect(jid: Jid, password: String) -> impl Future<Item = XMPPStream, Error = Error> {
 63        let username = jid.node.as_ref().unwrap().to_owned();
 64        let jid1 = jid.clone();
 65        let jid2 = jid.clone();
 66        let password = password;
 67        done(idna::domain_to_ascii(&jid.domain))
 68            .map_err(|_| Error::Idna)
 69            .and_then(|domain| {
 70                done(Connecter::from_lookup(
 71                    &domain,
 72                    Some("_xmpp-client._tcp"),
 73                    5222,
 74                ))
 75            })
 76            .flatten()
 77            .and_then(move |tcp_stream| {
 78                xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
 79            })
 80            .and_then(|xmpp_stream| {
 81                if Self::can_starttls(&xmpp_stream) {
 82                    Ok(Self::starttls(xmpp_stream))
 83                } else {
 84                    Err(Error::Protocol(ProtocolError::NoTls))
 85                }
 86            })
 87            .flatten()
 88            .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()))
 89            .and_then(
 90                move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten?
 91            )
 92            .and_then(|auth| auth)
 93            .and_then(|xmpp_stream| Self::bind(xmpp_stream))
 94            .and_then(|xmpp_stream| {
 95                // println!("Bound to {}", xmpp_stream.jid);
 96                Ok(xmpp_stream)
 97            })
 98    }
 99
100    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
101        stream
102            .stream_features
103            .get_child("starttls", NS_XMPP_TLS)
104            .is_some()
105    }
106
107    fn starttls<S: AsyncRead + AsyncWrite>(
108        stream: xmpp_stream::XMPPStream<S>,
109    ) -> StartTlsClient<S> {
110        StartTlsClient::from_stream(stream)
111    }
112
113    fn auth<S: AsyncRead + AsyncWrite + 'static>(
114        stream: xmpp_stream::XMPPStream<S>,
115        username: String,
116        password: String,
117    ) -> Result<ClientAuth<S>, Error> {
118        let creds = Credentials::default()
119            .with_username(username)
120            .with_password(password)
121            .with_channel_binding(ChannelBinding::None);
122        ClientAuth::new(stream, creds)
123    }
124
125    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
126        ClientBind::new(stream)
127    }
128}
129
130impl Stream for Client {
131    type Item = Event;
132    type Error = Error;
133
134    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
135        let state = replace(&mut self.state, ClientState::Invalid);
136
137        match state {
138            ClientState::Invalid => Err(Error::InvalidState),
139            ClientState::Disconnected => Ok(Async::Ready(None)),
140            ClientState::Connecting(mut connect) => match connect.poll() {
141                Ok(Async::Ready(stream)) => {
142                    self.state = ClientState::Connected(stream);
143                    Ok(Async::Ready(Some(Event::Online)))
144                }
145                Ok(Async::NotReady) => {
146                    self.state = ClientState::Connecting(connect);
147                    Ok(Async::NotReady)
148                }
149                Err(e) => Err(e),
150            },
151            ClientState::Connected(mut stream) => {
152                // Poll sink
153                match stream.poll_complete() {
154                    Ok(Async::NotReady) => (),
155                    Ok(Async::Ready(())) => (),
156                    Err(e) => return Err(e)?,
157                };
158
159                // Poll stream
160                match stream.poll() {
161                    Ok(Async::Ready(None)) => {
162                        // EOF
163                        self.state = ClientState::Disconnected;
164                        Ok(Async::Ready(Some(Event::Disconnected)))
165                    }
166                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
167                        self.state = ClientState::Connected(stream);
168                        Ok(Async::Ready(Some(Event::Stanza(stanza))))
169                    }
170                    Ok(Async::NotReady) | Ok(Async::Ready(_)) => {
171                        self.state = ClientState::Connected(stream);
172                        Ok(Async::NotReady)
173                    }
174                    Err(e) => Err(e)?,
175                }
176            }
177        }
178    }
179}
180
181impl Sink for Client {
182    type SinkItem = Element;
183    type SinkError = Error;
184
185    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
186        match self.state {
187            ClientState::Connected(ref mut stream) => match stream.start_send(Packet::Stanza(item))
188            {
189                Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)),
190                Ok(AsyncSink::NotReady(_)) => {
191                    panic!("Client.start_send with stanza but got something else back")
192                }
193                Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
194                Err(e) => Err(e)?,
195            },
196            _ => Ok(AsyncSink::NotReady(item)),
197        }
198    }
199
200    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
201        match self.state {
202            ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()),
203            _ => Ok(Async::Ready(())),
204        }
205    }
206}