tcp.rs

 1use std::net::SocketAddr;
 2use std::io::Error;
 3use futures::{Future, Poll, Async};
 4use tokio_core::reactor::Handle;
 5use tokio_core::net::{TcpStream, TcpStreamNew};
 6use jid::Jid;
 7
 8use xmpp_stream::*;
 9use stream_start::StreamStart;
10
11pub struct TcpClient {
12    state: TcpClientState,
13    jid: Jid,
14}
15
16enum TcpClientState {
17    Connecting(TcpStreamNew),
18    Start(StreamStart<TcpStream>),
19    Established,
20}
21
22impl TcpClient {
23    pub fn connect(jid: Jid, addr: &SocketAddr, handle: &Handle) -> Self {
24        let tcp_stream_new = TcpStream::connect(addr, handle);
25        TcpClient {
26            state: TcpClientState::Connecting(tcp_stream_new),
27            jid,
28        }
29    }
30}
31
32impl Future for TcpClient {
33    type Item = XMPPStream<TcpStream>;
34    type Error = Error;
35
36    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
37        let (new_state, result) = match self.state {
38            TcpClientState::Connecting(ref mut tcp_stream_new) => {
39                let tcp_stream = try_ready!(tcp_stream_new.poll());
40                let start = XMPPStream::from_stream(tcp_stream, self.jid.clone());
41                let new_state = TcpClientState::Start(start);
42                (new_state, Ok(Async::NotReady))
43            },
44            TcpClientState::Start(ref mut start) => {
45                let xmpp_stream = try_ready!(start.poll());
46                let new_state = TcpClientState::Established;
47                (new_state, Ok(Async::Ready(xmpp_stream)))
48            },
49            TcpClientState::Established =>
50                unreachable!(),
51        };
52
53        self.state = new_state;
54        match result {
55            // by polling again, we register new future
56            Ok(Async::NotReady) => self.poll(),
57            result => result
58        }
59    }
60}