mod.rs

  1use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
  2use idna;
  3use jid::{Jid, JidParseError};
  4use minidom::Element;
  5use sasl::common::{ChannelBinding, Credentials};
  6use std::mem::replace;
  7use std::str::FromStr;
  8use tokio::net::TcpStream;
  9use tokio_io::{AsyncRead, AsyncWrite};
 10use tokio_tls::TlsStream;
 11
 12use super::event::Event;
 13use super::happy_eyeballs::Connecter;
 14use super::starttls::{StartTlsClient, NS_XMPP_TLS};
 15use super::xmpp_codec::Packet;
 16use super::xmpp_stream;
 17use super::{Error, ProtocolError};
 18
 19mod auth;
 20use self::auth::ClientAuth;
 21mod bind;
 22use self::bind::ClientBind;
 23
 24/// XMPP client connection and state
 25pub struct Client {
 26    /// The client's current Jabber-Id
 27    pub jid: Jid,
 28    state: ClientState,
 29}
 30
 31type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 32const NS_JABBER_CLIENT: &str = "jabber:client";
 33
 34enum ClientState {
 35    Invalid,
 36    Disconnected,
 37    Connecting(Box<Future<Item = XMPPStream, Error = Error>>),
 38    Connected(XMPPStream),
 39}
 40
 41impl Client {
 42    /// Start a new XMPP client
 43    ///
 44    /// Start polling the returned instance so that it will connect
 45    /// and yield events.
 46    pub fn new(jid: &str, password: &str) -> Result<Self, JidParseError> {
 47        let jid = Jid::from_str(jid)?;
 48        let client = Self::new_with_jid(jid, password);
 49        Ok(client)
 50    }
 51
 52    /// Start a new client given that the JID is already parsed.
 53    pub fn new_with_jid(jid: Jid, password: &str) -> Self {
 54        let password = password.to_owned();
 55        let connect = Self::make_connect(jid.clone(), password.clone());
 56        let client = Client {
 57            jid,
 58            state: ClientState::Connecting(Box::new(connect)),
 59        };
 60        client
 61    }
 62
 63    fn make_connect(jid: Jid, password: String) -> impl Future<Item = XMPPStream, Error = Error> {
 64        let username = jid.node.as_ref().unwrap().to_owned();
 65        let jid1 = jid.clone();
 66        let jid2 = jid.clone();
 67        let password = password;
 68        done(idna::domain_to_ascii(&jid.domain))
 69            .map_err(|_| Error::Idna)
 70            .and_then(|domain| {
 71                done(Connecter::from_lookup(
 72                    &domain,
 73                    Some("_xmpp-client._tcp"),
 74                    5222,
 75                ))
 76            })
 77            .flatten()
 78            .and_then(move |tcp_stream| {
 79                xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
 80            })
 81            .and_then(|xmpp_stream| {
 82                if Self::can_starttls(&xmpp_stream) {
 83                    Ok(Self::starttls(xmpp_stream))
 84                } else {
 85                    Err(Error::Protocol(ProtocolError::NoTls))
 86                }
 87            })
 88            .flatten()
 89            .and_then(|tls_stream| XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned()))
 90            .and_then(
 91                move |xmpp_stream| done(Self::auth(xmpp_stream, username, password)), // TODO: flatten?
 92            )
 93            .and_then(|auth| auth)
 94            .and_then(|xmpp_stream| Self::bind(xmpp_stream))
 95            .and_then(|xmpp_stream| {
 96                // println!("Bound to {}", xmpp_stream.jid);
 97                Ok(xmpp_stream)
 98            })
 99    }
100
101    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
102        stream
103            .stream_features
104            .get_child("starttls", NS_XMPP_TLS)
105            .is_some()
106    }
107
108    fn starttls<S: AsyncRead + AsyncWrite>(
109        stream: xmpp_stream::XMPPStream<S>,
110    ) -> StartTlsClient<S> {
111        StartTlsClient::from_stream(stream)
112    }
113
114    fn auth<S: AsyncRead + AsyncWrite + 'static>(
115        stream: xmpp_stream::XMPPStream<S>,
116        username: String,
117        password: String,
118    ) -> Result<ClientAuth<S>, Error> {
119        let creds = Credentials::default()
120            .with_username(username)
121            .with_password(password)
122            .with_channel_binding(ChannelBinding::None);
123        ClientAuth::new(stream, creds)
124    }
125
126    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
127        ClientBind::new(stream)
128    }
129}
130
131impl Stream for Client {
132    type Item = Event;
133    type Error = Error;
134
135    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
136        let state = replace(&mut self.state, ClientState::Invalid);
137
138        match state {
139            ClientState::Invalid => Err(Error::InvalidState),
140            ClientState::Disconnected => Ok(Async::Ready(None)),
141            ClientState::Connecting(mut connect) => match connect.poll() {
142                Ok(Async::Ready(stream)) => {
143                    self.state = ClientState::Connected(stream);
144                    Ok(Async::Ready(Some(Event::Online)))
145                }
146                Ok(Async::NotReady) => {
147                    self.state = ClientState::Connecting(connect);
148                    Ok(Async::NotReady)
149                }
150                Err(e) => Err(e),
151            },
152            ClientState::Connected(mut stream) => {
153                // Poll sink
154                match stream.poll_complete() {
155                    Ok(Async::NotReady) => (),
156                    Ok(Async::Ready(())) => (),
157                    Err(e) => return Err(e)?,
158                };
159
160                // Poll stream
161                match stream.poll() {
162                    Ok(Async::Ready(None)) => {
163                        // EOF
164                        self.state = ClientState::Disconnected;
165                        Ok(Async::Ready(Some(Event::Disconnected)))
166                    }
167                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
168                        self.state = ClientState::Connected(stream);
169                        Ok(Async::Ready(Some(Event::Stanza(stanza))))
170                    }
171                    Ok(Async::NotReady) | Ok(Async::Ready(_)) => {
172                        self.state = ClientState::Connected(stream);
173                        Ok(Async::NotReady)
174                    }
175                    Err(e) => Err(e)?,
176                }
177            }
178        }
179    }
180}
181
182impl Sink for Client {
183    type SinkItem = Element;
184    type SinkError = Error;
185
186    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
187        match self.state {
188            ClientState::Connected(ref mut stream) => match stream.start_send(Packet::Stanza(item))
189            {
190                Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)),
191                Ok(AsyncSink::NotReady(_)) => {
192                    panic!("Client.start_send with stanza but got something else back")
193                }
194                Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
195                Err(e) => Err(e)?,
196            },
197            _ => Ok(AsyncSink::NotReady(item)),
198        }
199    }
200
201    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
202        match self.state {
203            ClientState::Connected(ref mut stream) => stream.poll_complete().map_err(|e| e.into()),
204            _ => Ok(Async::Ready(())),
205        }
206    }
207}