mod.rs

  1use std::mem::replace;
  2use std::str::FromStr;
  3use std::error::Error;
  4use tokio_core::reactor::Handle;
  5use tokio_core::net::TcpStream;
  6use tokio_io::{AsyncRead, AsyncWrite};
  7use tokio_tls::TlsStream;
  8use futures::*;
  9use jid::{Jid, JidParseError};
 10use xml;
 11use sasl::common::{Credentials, ChannelBinding};
 12
 13use super::xmpp_codec::Packet;
 14use super::xmpp_stream;
 15use super::tcp::TcpClient;
 16use super::starttls::{NS_XMPP_TLS, StartTlsClient};
 17
 18mod auth;
 19use self::auth::*;
 20mod bind;
 21use self::bind::*;
 22
 23pub struct Client {
 24    pub jid: Jid,
 25    state: ClientState,
 26}
 27
 28type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 29
 30enum ClientState {
 31    Invalid,
 32    Disconnected,
 33    Connecting(Box<Future<Item=XMPPStream, Error=String>>),
 34    Connected(XMPPStream),
 35}
 36
 37impl Client {
 38    pub fn new(jid: &str, password: &str, handle: &Handle) -> Result<Self, JidParseError> {
 39        let jid = try!(Jid::from_str(jid));
 40        let password = password.to_owned();
 41        let connect = Self::make_connect(jid.clone(), password.clone(), handle);
 42        Ok(Client {
 43            jid,
 44            state: ClientState::Connecting(connect),
 45        })
 46    }
 47
 48    fn make_connect(jid: Jid, password: String, handle: &Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
 49        // TODO: implement proper DNS SRV lookup
 50        use std::net::ToSocketAddrs;
 51        let addr = "89.238.79.220:5222"
 52            .to_socket_addrs().unwrap()
 53            .next().unwrap();
 54        let username = jid.node.as_ref().unwrap().to_owned();
 55        let password = password;
 56        Box::new(
 57            TcpClient::connect(
 58                jid,
 59                &addr,
 60                handle
 61            ).map_err(|e| format!("{}", e)
 62            ).and_then(|stream| {
 63                if Self::can_starttls(&stream) {
 64                    Self::starttls(stream)
 65                } else {
 66                    panic!("No STARTTLS")
 67                }
 68            }).and_then(move |stream| {
 69                Self::auth(stream, username, password).expect("auth")
 70            }).and_then(|stream| {
 71                Self::bind(stream)
 72            }).and_then(|stream| {
 73                println!("Bound to {}", stream.jid);
 74                Ok(stream)
 75            })
 76        )
 77    }
 78
 79    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
 80        stream.stream_features
 81            .get_child("starttls", Some(NS_XMPP_TLS))
 82            .is_some()
 83    }
 84
 85    fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
 86        StartTlsClient::from_stream(stream)
 87    }
 88
 89    fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, String> {
 90        let creds = Credentials::default()
 91            .with_username(username)
 92            .with_password(password)
 93            .with_channel_binding(ChannelBinding::None);
 94        ClientAuth::new(stream, creds)
 95    }
 96
 97    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
 98        ClientBind::new(stream)
 99    }
100}
101
102#[derive(Debug)]
103pub enum ClientEvent {
104    Online,
105    Disconnected,
106    Stanza(xml::Element),
107}
108
109impl Stream for Client {
110    type Item = ClientEvent;
111    type Error = String;
112
113    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
114        let state = replace(&mut self.state, ClientState::Invalid);
115
116        match state {
117            ClientState::Invalid =>
118                Err("invalid client state".to_owned()),
119            ClientState::Disconnected =>
120                Ok(Async::Ready(None)),
121            ClientState::Connecting(mut connect) => {
122                match connect.poll() {
123                    Ok(Async::Ready(stream)) => {
124                        self.state = ClientState::Connected(stream);
125                        Ok(Async::Ready(Some(ClientEvent::Online)))
126                    },
127                    Ok(Async::NotReady) => {
128                        self.state = ClientState::Connecting(connect);
129                        Ok(Async::NotReady)
130                    },
131                    Err(e) =>
132                        Err(e),
133                }
134            },
135            ClientState::Connected(mut stream) => {
136                match stream.poll() {
137                    Ok(Async::NotReady) => {
138                        self.state = ClientState::Connected(stream);
139                        Ok(Async::NotReady)
140                    },
141                    Ok(Async::Ready(None)) => {
142                        // EOF
143                        self.state = ClientState::Disconnected;
144                        Ok(Async::Ready(Some(ClientEvent::Disconnected)))
145                    },
146                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
147                        self.state = ClientState::Connected(stream);
148                        Ok(Async::Ready(Some(ClientEvent::Stanza(stanza))))
149                    },
150                    Ok(Async::Ready(_)) => {
151                        self.state = ClientState::Connected(stream);
152                        Ok(Async::NotReady)
153                    },
154                    Err(e) =>
155                        Err(e.description().to_owned()),
156                }
157            },
158        }
159    }
160}
161
162impl Sink for Client {
163    type SinkItem = xml::Element;
164    type SinkError = String;
165
166    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
167        match self.state {
168            ClientState::Connected(ref mut stream) =>
169                match stream.start_send(Packet::Stanza(item)) {
170                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
171                        Ok(AsyncSink::NotReady(stanza)),
172                    Ok(AsyncSink::NotReady(_)) =>
173                        panic!("Client.start_send with stanza but got something else back"),
174                    Ok(AsyncSink::Ready) => {
175                        Ok(AsyncSink::Ready)
176                    },
177                    Err(e) =>
178                        Err(e.description().to_owned()),
179                },
180            _ =>
181                Ok(AsyncSink::NotReady(item)),
182        }
183    }
184
185    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
186        match &mut self.state {
187            &mut ClientState::Connected(ref mut stream) =>
188                stream.poll_complete()
189                .map_err(|e| e.description().to_owned()),
190            _ =>
191                Ok(Async::Ready(())),
192        }
193    }
194}