mod.rs

  1use std::mem::replace;
  2use std::str::FromStr;
  3use std::error::Error as StdError;
  4use tokio_core::reactor::Handle;
  5use tokio_core::net::TcpStream;
  6use tokio_io::{AsyncRead, AsyncWrite};
  7use tokio_tls::TlsStream;
  8use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done};
  9use minidom::Element;
 10use jid::{Jid, JidParseError};
 11use sasl::common::{Credentials, ChannelBinding};
 12use idna;
 13
 14use super::xmpp_codec::Packet;
 15use super::xmpp_stream;
 16use super::starttls::{NS_XMPP_TLS, StartTlsClient};
 17use super::happy_eyeballs::Connecter;
 18use super::event::Event;
 19use super::{Error, ProtocolError};
 20
 21mod auth;
 22use self::auth::ClientAuth;
 23mod bind;
 24use self::bind::ClientBind;
 25
 26/// XMPP client connection and state
 27pub struct Client {
 28    /// The client's current Jabber-Id
 29    pub jid: Jid,
 30    state: ClientState,
 31}
 32
 33type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 34const NS_JABBER_CLIENT: &str = "jabber:client";
 35
 36enum ClientState {
 37    Invalid,
 38    Disconnected,
 39    Connecting(Box<Future<Item=XMPPStream, Error=Error>>),
 40    Connected(XMPPStream),
 41}
 42
 43impl Client {
 44    /// Start a new XMPP client
 45    ///
 46    /// Start polling the returned instance so that it will connect
 47    /// and yield events.
 48    pub fn new(jid: &str, password: &str, handle: Handle) -> Result<Self, JidParseError> {
 49        let jid = Jid::from_str(jid)?;
 50        let password = password.to_owned();
 51        let connect = Self::make_connect(jid.clone(), password.clone(), handle);
 52        Ok(Client {
 53            jid,
 54            state: ClientState::Connecting(Box::new(connect)),
 55        })
 56    }
 57
 58    fn make_connect(jid: Jid, password: String, handle: Handle) -> impl Future<Item=XMPPStream, Error=Error> {
 59        let username = jid.node.as_ref().unwrap().to_owned();
 60        let jid1 = jid.clone();
 61        let jid2 = jid.clone();
 62        let password = password;
 63        done(idna::domain_to_ascii(&jid.domain))
 64            .map_err(|_| Error::Idna)
 65            .and_then(|domain|
 66                      done(Connecter::from_lookup(handle, &domain, "_xmpp-client._tcp", 5222))
 67                      .map_err(Error::Domain)
 68            )
 69            .and_then(|connecter|
 70                      connecter
 71                      .map_err(Error::Connection)
 72            ).and_then(move |tcp_stream|
 73                      xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
 74            ).and_then(|xmpp_stream| {
 75                if Self::can_starttls(&xmpp_stream) {
 76                    Ok(Self::starttls(xmpp_stream))
 77                } else {
 78                    Err(Error::Protocol(ProtocolError::NoTls))
 79                }
 80            }).and_then(|starttls|
 81                        // TODO: flatten?
 82                        starttls
 83            ).and_then(|tls_stream|
 84                       XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned())
 85            ).and_then(move |xmpp_stream|
 86                       done(Self::auth(xmpp_stream, username, password))
 87                       // TODO: flatten?
 88            ).and_then(|auth| auth)
 89            .and_then(|xmpp_stream| {
 90                Self::bind(xmpp_stream)
 91            }).and_then(|xmpp_stream| {
 92                // println!("Bound to {}", xmpp_stream.jid);
 93                Ok(xmpp_stream)
 94            })
 95    }
 96
 97    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
 98        stream.stream_features
 99            .get_child("starttls", NS_XMPP_TLS)
100            .is_some()
101    }
102
103    fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
104        StartTlsClient::from_stream(stream)
105    }
106
107    fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, Error> {
108        let creds = Credentials::default()
109            .with_username(username)
110            .with_password(password)
111            .with_channel_binding(ChannelBinding::None);
112        ClientAuth::new(stream, creds)
113    }
114
115    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
116        ClientBind::new(stream)
117    }
118}
119
120impl Stream for Client {
121    type Item = Event;
122    type Error = Error;
123
124    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
125        let state = replace(&mut self.state, ClientState::Invalid);
126
127        match state {
128            ClientState::Invalid =>
129                Err(Error::InvalidState),
130            ClientState::Disconnected =>
131                Ok(Async::Ready(None)),
132            ClientState::Connecting(mut connect) => {
133                match connect.poll() {
134                    Ok(Async::Ready(stream)) => {
135                        self.state = ClientState::Connected(stream);
136                        Ok(Async::Ready(Some(Event::Online)))
137                    },
138                    Ok(Async::NotReady) => {
139                        self.state = ClientState::Connecting(connect);
140                        Ok(Async::NotReady)
141                    },
142                    Err(e) =>
143                        Err(e),
144                }
145            },
146            ClientState::Connected(mut stream) => {
147                // Poll sink
148                match stream.poll_complete() {
149                    Ok(Async::NotReady) => (),
150                    Ok(Async::Ready(())) => (),
151                    Err(e) =>
152                        return Err(Error::Io(e)),
153                };
154
155                // Poll stream
156                match stream.poll() {
157                    Ok(Async::Ready(None)) => {
158                        // EOF
159                        self.state = ClientState::Disconnected;
160                        Ok(Async::Ready(Some(Event::Disconnected)))
161                    },
162                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
163                        self.state = ClientState::Connected(stream);
164                        Ok(Async::Ready(Some(Event::Stanza(stanza))))
165                    },
166                    Ok(Async::NotReady) |
167                    Ok(Async::Ready(_)) => {
168                        self.state = ClientState::Connected(stream);
169                        Ok(Async::NotReady)
170                    },
171                    Err(e) =>
172                        Err(e.into()),
173                }
174            },
175        }
176    }
177}
178
179impl Sink for Client {
180    type SinkItem = Element;
181    type SinkError = String;
182
183    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
184        match self.state {
185            ClientState::Connected(ref mut stream) =>
186                match stream.start_send(Packet::Stanza(item)) {
187                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
188                        Ok(AsyncSink::NotReady(stanza)),
189                    Ok(AsyncSink::NotReady(_)) =>
190                        panic!("Client.start_send with stanza but got something else back"),
191                    Ok(AsyncSink::Ready) => {
192                        Ok(AsyncSink::Ready)
193                    },
194                    Err(e) =>
195                        Err(e.description().to_owned()),
196                },
197            _ =>
198                Ok(AsyncSink::NotReady(item)),
199        }
200    }
201
202    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
203        match self.state {
204            ClientState::Connected(ref mut stream) =>
205                stream.poll_complete()
206                .map_err(|e| e.description().to_owned()),
207            _ =>
208                Ok(Async::Ready(())),
209        }
210    }
211}