mod.rs

  1use std::mem::replace;
  2use std::str::FromStr;
  3use std::error::Error;
  4use tokio_core::reactor::Handle;
  5use tokio_core::net::TcpStream;
  6use tokio_io::{AsyncRead, AsyncWrite};
  7use tokio_tls::TlsStream;
  8use futures::*;
  9use jid::{Jid, JidParseError};
 10use xml;
 11use sasl::common::{Credentials, ChannelBinding};
 12
 13use super::xmpp_codec::Packet;
 14use super::xmpp_stream;
 15use super::tcp::TcpClient;
 16use super::starttls::{NS_XMPP_TLS, StartTlsClient};
 17use super::happy_eyeballs::Connecter;
 18
 19mod auth;
 20use self::auth::*;
 21mod bind;
 22use self::bind::*;
 23mod event;
 24pub use self::event::Event as ClientEvent;
 25
 26pub struct Client {
 27    pub jid: Jid,
 28    state: ClientState,
 29}
 30
 31type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 32
 33enum ClientState {
 34    Invalid,
 35    Disconnected,
 36    Connecting(Box<Future<Item=XMPPStream, Error=String>>),
 37    Connected(XMPPStream),
 38}
 39
 40impl Client {
 41    pub fn new(jid: &str, password: &str, handle: Handle) -> Result<Self, JidParseError> {
 42        let jid = try!(Jid::from_str(jid));
 43        let password = password.to_owned();
 44        let connect = Self::make_connect(jid.clone(), password.clone(), handle);
 45        Ok(Client {
 46            jid,
 47            state: ClientState::Connecting(connect),
 48        })
 49    }
 50
 51    fn make_connect(jid: Jid, password: String, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
 52        let username = jid.node.as_ref().unwrap().to_owned();
 53        let password = password;
 54        Box::new(
 55            Connecter::from_lookup(handle, &jid.domain, "_xmpp-client._tcp", 5222)
 56                .expect("Connector::from_lookup")
 57                .and_then(|tcp_stream|
 58                          TcpClient::from_stream(jid, tcp_stream)
 59                          .map_err(|e| format!("{}", e))
 60                ).and_then(|stream| {
 61                    if Self::can_starttls(&stream) {
 62                        Self::starttls(stream)
 63                    } else {
 64                        panic!("No STARTTLS")
 65                    }
 66                }).and_then(move |stream| {
 67                    Self::auth(stream, username, password).expect("auth")
 68                }).and_then(|stream| {
 69                    Self::bind(stream)
 70                }).and_then(|stream| {
 71                    println!("Bound to {}", stream.jid);
 72                    Ok(stream)
 73                })
 74        )
 75    }
 76
 77    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
 78        stream.stream_features
 79            .get_child("starttls", Some(NS_XMPP_TLS))
 80            .is_some()
 81    }
 82
 83    fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
 84        StartTlsClient::from_stream(stream)
 85    }
 86
 87    fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, String> {
 88        let creds = Credentials::default()
 89            .with_username(username)
 90            .with_password(password)
 91            .with_channel_binding(ChannelBinding::None);
 92        ClientAuth::new(stream, creds)
 93    }
 94
 95    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
 96        ClientBind::new(stream)
 97    }
 98}
 99
100impl Stream for Client {
101    type Item = ClientEvent;
102    type Error = String;
103
104    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
105        let state = replace(&mut self.state, ClientState::Invalid);
106
107        match state {
108            ClientState::Invalid =>
109                Err("invalid client state".to_owned()),
110            ClientState::Disconnected =>
111                Ok(Async::Ready(None)),
112            ClientState::Connecting(mut connect) => {
113                match connect.poll() {
114                    Ok(Async::Ready(stream)) => {
115                        self.state = ClientState::Connected(stream);
116                        Ok(Async::Ready(Some(ClientEvent::Online)))
117                    },
118                    Ok(Async::NotReady) => {
119                        self.state = ClientState::Connecting(connect);
120                        Ok(Async::NotReady)
121                    },
122                    Err(e) =>
123                        Err(e),
124                }
125            },
126            ClientState::Connected(mut stream) => {
127                match stream.poll() {
128                    Ok(Async::NotReady) => {
129                        self.state = ClientState::Connected(stream);
130                        Ok(Async::NotReady)
131                    },
132                    Ok(Async::Ready(None)) => {
133                        // EOF
134                        self.state = ClientState::Disconnected;
135                        Ok(Async::Ready(Some(ClientEvent::Disconnected)))
136                    },
137                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
138                        self.state = ClientState::Connected(stream);
139                        Ok(Async::Ready(Some(ClientEvent::Stanza(stanza))))
140                    },
141                    Ok(Async::Ready(_)) => {
142                        self.state = ClientState::Connected(stream);
143                        Ok(Async::NotReady)
144                    },
145                    Err(e) =>
146                        Err(e.description().to_owned()),
147                }
148            },
149        }
150    }
151}
152
153impl Sink for Client {
154    type SinkItem = xml::Element;
155    type SinkError = String;
156
157    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
158        match self.state {
159            ClientState::Connected(ref mut stream) =>
160                match stream.start_send(Packet::Stanza(item)) {
161                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
162                        Ok(AsyncSink::NotReady(stanza)),
163                    Ok(AsyncSink::NotReady(_)) =>
164                        panic!("Client.start_send with stanza but got something else back"),
165                    Ok(AsyncSink::Ready) => {
166                        Ok(AsyncSink::Ready)
167                    },
168                    Err(e) =>
169                        Err(e.description().to_owned()),
170                },
171            _ =>
172                Ok(AsyncSink::NotReady(item)),
173        }
174    }
175
176    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
177        match &mut self.state {
178            &mut ClientState::Connected(ref mut stream) =>
179                stream.poll_complete()
180                .map_err(|e| e.description().to_owned()),
181            _ =>
182                Ok(Async::Ready(())),
183        }
184    }
185}