mod.rs

  1use std::mem::replace;
  2use std::str::FromStr;
  3use std::error::Error;
  4use tokio_core::reactor::Handle;
  5use tokio_core::net::TcpStream;
  6use tokio_io::{AsyncRead, AsyncWrite};
  7use tokio_tls::TlsStream;
  8use futures::{future, Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
  9use minidom::Element;
 10use jid::{Jid, JidParseError};
 11use sasl::common::{Credentials, ChannelBinding};
 12use idna;
 13
 14use super::xmpp_codec::Packet;
 15use super::xmpp_stream;
 16use super::starttls::{NS_XMPP_TLS, StartTlsClient};
 17use super::happy_eyeballs::Connecter;
 18use super::event::Event;
 19
 20mod auth;
 21use self::auth::ClientAuth;
 22mod bind;
 23use self::bind::ClientBind;
 24
 25/// XMPP client connection and state
 26pub struct Client {
 27    /// The client's current Jabber-Id
 28    pub jid: Jid,
 29    state: ClientState,
 30}
 31
 32type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 33const NS_JABBER_CLIENT: &str = "jabber:client";
 34
 35enum ClientState {
 36    Invalid,
 37    Disconnected,
 38    Connecting(Box<Future<Item=XMPPStream, Error=String>>),
 39    Connected(XMPPStream),
 40}
 41
 42impl Client {
 43    /// Start a new XMPP client
 44    ///
 45    /// Start polling the returned instance so that it will connect
 46    /// and yield events.
 47    pub fn new(jid: &str, password: &str, handle: Handle) -> Result<Self, JidParseError> {
 48        let jid = Jid::from_str(jid)?;
 49        let password = password.to_owned();
 50        let connect = Self::make_connect(jid.clone(), password.clone(), handle);
 51        Ok(Client {
 52            jid,
 53            state: ClientState::Connecting(connect),
 54        })
 55    }
 56
 57    fn make_connect(jid: Jid, password: String, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
 58        let username = jid.node.as_ref().unwrap().to_owned();
 59        let jid1 = jid.clone();
 60        let jid2 = jid.clone();
 61        let password = password;
 62        let domain = match idna::domain_to_ascii(&jid.domain) {
 63            Ok(domain) =>
 64                domain,
 65            Err(e) =>
 66                return Box::new(future::err(format!("{:?}", e))),
 67        };
 68        Box::new(
 69            Connecter::from_lookup(handle, &domain, "_xmpp-client._tcp", 5222)
 70                .expect("Connector::from_lookup")
 71                .and_then(move |tcp_stream|
 72                          xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
 73                          .map_err(|e| format!("{}", e))
 74                ).and_then(|xmpp_stream| {
 75                    if Self::can_starttls(&xmpp_stream) {
 76                        Ok(Self::starttls(xmpp_stream))
 77                    } else {
 78                        Err("No STARTTLS".to_owned())
 79                    }
 80                }).and_then(|starttls|
 81                            starttls
 82                ).and_then(|tls_stream|
 83                          XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned())
 84                          .map_err(|e| format!("{}", e))
 85                ).and_then(move |xmpp_stream| {
 86                    Self::auth(xmpp_stream, username, password).expect("auth")
 87                }).and_then(|xmpp_stream| {
 88                    Self::bind(xmpp_stream)
 89                }).and_then(|xmpp_stream| {
 90                    // println!("Bound to {}", xmpp_stream.jid);
 91                    Ok(xmpp_stream)
 92                })
 93        )
 94    }
 95
 96    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
 97        stream.stream_features
 98            .get_child("starttls", NS_XMPP_TLS)
 99            .is_some()
100    }
101
102    fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
103        StartTlsClient::from_stream(stream)
104    }
105
106    fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, String> {
107        let creds = Credentials::default()
108            .with_username(username)
109            .with_password(password)
110            .with_channel_binding(ChannelBinding::None);
111        ClientAuth::new(stream, creds)
112    }
113
114    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
115        ClientBind::new(stream)
116    }
117}
118
119impl Stream for Client {
120    type Item = Event;
121    type Error = String;
122
123    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
124        let state = replace(&mut self.state, ClientState::Invalid);
125
126        match state {
127            ClientState::Invalid =>
128                Err("invalid client state".to_owned()),
129            ClientState::Disconnected =>
130                Ok(Async::Ready(None)),
131            ClientState::Connecting(mut connect) => {
132                match connect.poll() {
133                    Ok(Async::Ready(stream)) => {
134                        self.state = ClientState::Connected(stream);
135                        Ok(Async::Ready(Some(Event::Online)))
136                    },
137                    Ok(Async::NotReady) => {
138                        self.state = ClientState::Connecting(connect);
139                        Ok(Async::NotReady)
140                    },
141                    Err(e) =>
142                        Err(e),
143                }
144            },
145            ClientState::Connected(mut stream) => {
146                // Poll sink
147                match stream.poll_complete() {
148                    Ok(Async::NotReady) => (),
149                    Ok(Async::Ready(())) => (),
150                    Err(e) =>
151                        return Err(e.description().to_owned()),
152                };
153
154                // Poll stream
155                match stream.poll() {
156                    Ok(Async::Ready(None)) => {
157                        // EOF
158                        self.state = ClientState::Disconnected;
159                        Ok(Async::Ready(Some(Event::Disconnected)))
160                    },
161                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
162                        self.state = ClientState::Connected(stream);
163                        Ok(Async::Ready(Some(Event::Stanza(stanza))))
164                    },
165                    Ok(Async::NotReady) |
166                    Ok(Async::Ready(_)) => {
167                        self.state = ClientState::Connected(stream);
168                        Ok(Async::NotReady)
169                    },
170                    Err(e) =>
171                        Err(e.description().to_owned()),
172                }
173            },
174        }
175    }
176}
177
178impl Sink for Client {
179    type SinkItem = Element;
180    type SinkError = String;
181
182    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
183        match self.state {
184            ClientState::Connected(ref mut stream) =>
185                match stream.start_send(Packet::Stanza(item)) {
186                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
187                        Ok(AsyncSink::NotReady(stanza)),
188                    Ok(AsyncSink::NotReady(_)) =>
189                        panic!("Client.start_send with stanza but got something else back"),
190                    Ok(AsyncSink::Ready) => {
191                        Ok(AsyncSink::Ready)
192                    },
193                    Err(e) =>
194                        Err(e.description().to_owned()),
195                },
196            _ =>
197                Ok(AsyncSink::NotReady(item)),
198        }
199    }
200
201    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
202        match self.state {
203            ClientState::Connected(ref mut stream) =>
204                stream.poll_complete()
205                .map_err(|e| e.description().to_owned()),
206            _ =>
207                Ok(Async::Ready(())),
208        }
209    }
210}