1#[macro_use]
2extern crate futures;
3extern crate tokio_core;
4extern crate xml;
5
6use std::net::SocketAddr;
7use std::net::ToSocketAddrs;
8use std::sync::Arc;
9use std::io::ErrorKind;
10use futures::{Future, BoxFuture, Sink, Poll, Async};
11use futures::stream::{Stream, iter};
12use futures::future::result;
13use tokio_core::reactor::Handle;
14use tokio_core::io::Io;
15use tokio_core::net::{TcpStream, TcpStreamNew};
16
17mod xmpp_codec;
18use xmpp_codec::*;
19
20
21// type FullClient = sasl::Client<StartTLS<TCPConnection>>
22
23#[derive(Debug)]
24pub struct TcpClient {
25 state: TcpClientState,
26}
27
28enum TcpClientState {
29 Connecting(TcpStreamNew),
30 SendStart(futures::sink::Send<XMPPStream<TcpStream>>),
31 RecvStart(Option<XMPPStream<TcpStream>>),
32 Established,
33 Invalid,
34}
35
36impl std::fmt::Debug for TcpClientState {
37 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
38 let s = match *self {
39 TcpClientState::Connecting(_) => "Connecting",
40 TcpClientState::SendStart(_) => "SendStart",
41 TcpClientState::RecvStart(_) => "RecvStart",
42 TcpClientState::Established => "Established",
43 TcpClientState::Invalid => "Invalid",
44 };
45 write!(fmt, "{}", s)
46 }
47}
48
49impl TcpClient {
50 pub fn connect(addr: &SocketAddr, handle: &Handle) -> Self {
51 let tcp_stream_new = TcpStream::connect(addr, handle);
52 TcpClient {
53 state: TcpClientState::Connecting(tcp_stream_new),
54 }
55 }
56}
57
58impl Future for TcpClient {
59 type Item = XMPPStream<TcpStream>;
60 type Error = std::io::Error;
61
62 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
63 let (new_state, result) = match self.state {
64 TcpClientState::Connecting(ref mut tcp_stream_new) => {
65 let tcp_stream = try_ready!(tcp_stream_new.poll());
66 let xmpp_stream = tcp_stream.framed(XMPPCodec::new());
67 let send = xmpp_stream.send(Packet::StreamStart);
68 let new_state = TcpClientState::SendStart(send);
69 (new_state, Ok(Async::NotReady))
70 },
71 TcpClientState::SendStart(ref mut send) => {
72 let xmpp_stream = try_ready!(send.poll());
73 let new_state = TcpClientState::RecvStart(Some(xmpp_stream));
74 (new_state, Ok(Async::NotReady))
75 },
76 TcpClientState::RecvStart(ref mut opt_xmpp_stream) => {
77 let mut xmpp_stream = opt_xmpp_stream.take().unwrap();
78 match xmpp_stream.poll() {
79 Ok(Async::Ready(Some(events))) => println!("Recv start: {:?}", events),
80 Ok(Async::Ready(_)) => return Err(std::io::Error::from(ErrorKind::InvalidData)),
81 Ok(Async::NotReady) => {
82 *opt_xmpp_stream = Some(xmpp_stream);
83 return Ok(Async::NotReady);
84 },
85 Err(e) => return Err(e)
86 };
87 let new_state = TcpClientState::Established;
88 (new_state, Ok(Async::Ready(xmpp_stream)))
89 },
90 TcpClientState::Established | TcpClientState::Invalid =>
91 unreachable!(),
92 };
93
94 println!("Next state: {:?}", new_state);
95 self.state = new_state;
96 match result {
97 // by polling again, we register new future
98 Ok(Async::NotReady) => self.poll(),
99 result => result
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use tokio_core::reactor::Core;
107 use futures::{Future, Stream};
108
109 #[test]
110 fn it_works() {
111 use std::net::ToSocketAddrs;
112 let addr = "[2a01:4f8:a0:33d0::5]:5222"
113 .to_socket_addrs().unwrap()
114 .next().unwrap();
115
116 let mut core = Core::new().unwrap();
117 let client = super::TcpClient::connect(
118 &addr,
119 &core.handle()
120 ).and_then(|stream| {
121 stream.for_each(|item| {
122 Ok(println!("stream item: {:?}", item))
123 })
124 });
125 core.run(client).unwrap();
126 }
127
128 // TODO: test truncated utf8
129}