tcp.rs

 1use std::net::SocketAddr;
 2use std::io::Error;
 3use futures::{Future, Poll, Async};
 4use tokio_core::reactor::Handle;
 5use tokio_core::net::{TcpStream, TcpStreamNew};
 6
 7use xmpp_stream::*;
 8use stream_start::StreamStart;
 9
10pub struct TcpClient {
11    state: TcpClientState,
12}
13
14enum TcpClientState {
15    Connecting(TcpStreamNew),
16    Start(StreamStart<TcpStream>),
17    Established,
18}
19
20impl TcpClient {
21    pub fn connect(addr: &SocketAddr, handle: &Handle) -> Self {
22        let tcp_stream_new = TcpStream::connect(addr, handle);
23        TcpClient {
24            state: TcpClientState::Connecting(tcp_stream_new),
25        }
26    }
27}
28
29impl Future for TcpClient {
30    type Item = XMPPStream<TcpStream>;
31    type Error = Error;
32
33    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
34        let (new_state, result) = match self.state {
35            TcpClientState::Connecting(ref mut tcp_stream_new) => {
36                let tcp_stream = try_ready!(tcp_stream_new.poll());
37                let start = XMPPStream::from_stream(tcp_stream, "spaceboyz.net".to_owned());
38                let new_state = TcpClientState::Start(start);
39                (new_state, Ok(Async::NotReady))
40            },
41            TcpClientState::Start(ref mut start) => {
42                let xmpp_stream = try_ready!(start.poll());
43                let new_state = TcpClientState::Established;
44                (new_state, Ok(Async::Ready(xmpp_stream)))
45            },
46            TcpClientState::Established =>
47                unreachable!(),
48        };
49
50        self.state = new_state;
51	match result {
52	    // by polling again, we register new future
53	    Ok(Async::NotReady) => self.poll(),
54	    result => result
55	}
56    }
57}