lib.rs

  1#[macro_use]
  2extern crate futures;
  3extern crate tokio_core;
  4extern crate xml;
  5
  6use std::net::SocketAddr;
  7use std::net::ToSocketAddrs;
  8use std::sync::Arc;
  9use std::io::ErrorKind;
 10use futures::{Future, BoxFuture, Sink, Poll, Async};
 11use futures::stream::{Stream, iter};
 12use futures::future::result;
 13use tokio_core::reactor::Handle;
 14use tokio_core::io::Io;
 15use tokio_core::net::{TcpStream, TcpStreamNew};
 16
 17mod xmpp_codec;
 18use xmpp_codec::*;
 19
 20
 21// type FullClient = sasl::Client<StartTLS<TCPConnection>>
 22
 23#[derive(Debug)]
 24pub struct TcpClient {
 25    state: TcpClientState,
 26}
 27
 28enum TcpClientState {
 29    Connecting(TcpStreamNew),
 30    SendStart(futures::sink::Send<XMPPStream<TcpStream>>),
 31    RecvStart(Option<XMPPStream<TcpStream>>),
 32    Established,
 33    Invalid,
 34}
 35
 36impl std::fmt::Debug for TcpClientState {
 37    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
 38        let s = match *self {
 39            TcpClientState::Connecting(_) => "Connecting",
 40            TcpClientState::SendStart(_) => "SendStart",
 41            TcpClientState::RecvStart(_) => "RecvStart",
 42            TcpClientState::Established => "Established",
 43            TcpClientState::Invalid => "Invalid",
 44        };
 45        write!(fmt, "{}", s)
 46    }
 47}
 48
 49impl TcpClient {
 50    pub fn connect(addr: &SocketAddr, handle: &Handle) -> Self {
 51        let tcp_stream_new = TcpStream::connect(addr, handle);
 52        TcpClient {
 53            state: TcpClientState::Connecting(tcp_stream_new),
 54        }
 55    }
 56}
 57
 58impl Future for TcpClient {
 59    type Item = XMPPStream<TcpStream>;
 60    type Error = std::io::Error;
 61
 62    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 63        let (new_state, result) = match self.state {
 64            TcpClientState::Connecting(ref mut tcp_stream_new) => {
 65                let tcp_stream = try_ready!(tcp_stream_new.poll());
 66                let xmpp_stream = tcp_stream.framed(XMPPCodec::new());
 67                let send = xmpp_stream.send(Packet::StreamStart);
 68                let new_state = TcpClientState::SendStart(send);
 69                (new_state, Ok(Async::NotReady))
 70            },
 71            TcpClientState::SendStart(ref mut send) => {
 72                let xmpp_stream = try_ready!(send.poll());
 73                let new_state = TcpClientState::RecvStart(Some(xmpp_stream));
 74                (new_state, Ok(Async::NotReady))
 75            },
 76            TcpClientState::RecvStart(ref mut opt_xmpp_stream) => {
 77                let mut xmpp_stream = opt_xmpp_stream.take().unwrap();
 78                match xmpp_stream.poll() {
 79                    Ok(Async::Ready(Some(events))) => println!("Recv start: {:?}", events),
 80                    Ok(Async::Ready(_)) => return Err(std::io::Error::from(ErrorKind::InvalidData)),
 81                    Ok(Async::NotReady) => {
 82                        *opt_xmpp_stream = Some(xmpp_stream);
 83                        return Ok(Async::NotReady);
 84                    },
 85                    Err(e) => return Err(e)
 86                };
 87                let new_state = TcpClientState::Established;
 88                (new_state, Ok(Async::Ready(xmpp_stream)))
 89            },
 90            TcpClientState::Established | TcpClientState::Invalid =>
 91                unreachable!(),
 92        };
 93
 94        println!("Next state: {:?}", new_state);
 95        self.state = new_state;
 96	match result {
 97	    // by polling again, we register new future
 98	    Ok(Async::NotReady) => self.poll(),
 99	    result => result
100	}
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use tokio_core::reactor::Core;
107    use futures::{Future, Stream};
108
109    #[test]
110    fn it_works() {
111        use std::net::ToSocketAddrs;
112        let addr = "[2a01:4f8:a0:33d0::5]:5222"
113            .to_socket_addrs().unwrap()
114            .next().unwrap();
115
116        let mut core = Core::new().unwrap();
117        let client = super::TcpClient::connect(
118            &addr,
119            &core.handle()
120        ).and_then(|stream| {
121            stream.for_each(|item| {
122                Ok(println!("stream item: {:?}", item))
123            })
124        });
125        core.run(client).unwrap();
126    }
127
128    // TODO: test truncated utf8
129}