1extern crate futures;
2extern crate tokio_core;
3extern crate xml;
4
5use std::net::SocketAddr;
6use std::net::ToSocketAddrs;
7use std::sync::Arc;
8use std::io::ErrorKind;
9use futures::{Future, BoxFuture, Sink, Poll};
10use futures::stream::{Stream, iter};
11use futures::future::result;
12use tokio_core::reactor::Handle;
13use tokio_core::io::Io;
14use tokio_core::net::TcpStream;
15
16mod xmpp_codec;
17use xmpp_codec::*;
18
19
20// type FullClient = sasl::Client<StartTLS<TCPConnection>>
21
22type Event = ();
23type Error = std::io::Error;
24
25struct TCPStream {
26 source: Box<Stream<Item=Event, Error=std::io::Error>>,
27 sink: Arc<Box<futures::stream::SplitSink<tokio_core::io::Framed<tokio_core::net::TcpStream, xmpp_codec::XMPPCodec>>>>,
28}
29
30impl TCPStream {
31 pub fn connect(addr: &SocketAddr, handle: &Handle) -> BoxFuture<Arc<TCPStream>, std::io::Error> {
32 TcpStream::connect(addr, handle)
33 .and_then(|stream| {
34 let (sink, source) = stream.framed(XMPPCodec::new())
35 // .framed(UTF8Codec::new())
36 .split();
37
38 sink.send(Packet::StreamStart)
39 .and_then(|sink| result(Ok((Arc::new(Box::new(sink)), source))))
40 })
41 .and_then(|(sink, source)| {
42 let sink1 = sink.clone();
43 let source = source
44 .map(|items| iter(items.into_iter().map(Ok)))
45 .flatten()
46 .filter_map(move |pkt| Self::process_packet(pkt, &sink1))
47 // .for_each(|ev| {
48 // match ev {
49 // Packet::Stanza
50 // _ => (),
51 // }
52 // Ok(println!("xmpp: {:?}", ev))
53 // })
54 // .boxed();
55 ;
56 result(Ok(Arc::new(TCPStream {
57 source: Box::new(source),
58 sink: sink,
59 })))
60 }).boxed()
61 //.map_err(|e| std::io::Error::new(ErrorKind::Other, e));
62 }
63
64 fn process_packet<S>(pkt: Packet, sink: &Arc<S>) -> Option<Event>
65 where S: Sink<SinkItem=Packet, SinkError=std::io::Error> {
66
67 println!("pkt: {:?}", pkt);
68 None
69 }
70}
71
72struct ClientStream {
73 inner: TCPStream,
74}
75
76impl ClientStream {
77 pub fn connect(jid: &str, password: &str, handle: &Handle) -> Box<Future<Item=Self, Error=std::io::Error>> {
78 let addr = "[2a01:4f8:a0:33d0::5]:5222"
79 .to_socket_addrs().unwrap()
80 .next().unwrap();
81 let stream =
82 TCPStream::connect(&addr, handle)
83 .and_then(|stream| {
84 Ok(ClientStream {
85 inner: stream
86 })
87 });
88 Box::new(stream)
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use tokio_core::reactor::Core;
95
96 #[test]
97 fn it_works() {
98 let mut core = Core::new().unwrap();
99 let client = super::ClientStream::connect(
100 "astro@spaceboyz.net",
101 "...",
102 &core.handle()
103 ).and_then(|stream| {
104 stream.inner.source.boxed().for_each(|item| {
105 Ok(println!("stream item: {:?}", item))
106 })
107 }).boxed();
108 core.run(client).unwrap();
109 }
110
111 // TODO: test truncated utf8
112}