lib.rs

  1extern crate futures;
  2extern crate tokio_core;
  3extern crate xml;
  4
  5use std::net::SocketAddr;
  6use std::net::ToSocketAddrs;
  7use std::sync::Arc;
  8use std::io::ErrorKind;
  9use futures::{Future, BoxFuture, Sink, Poll};
 10use futures::stream::{Stream, iter};
 11use futures::future::result;
 12use tokio_core::reactor::Handle;
 13use tokio_core::io::Io;
 14use tokio_core::net::TcpStream;
 15
 16mod xmpp_codec;
 17use xmpp_codec::*;
 18
 19
 20// type FullClient = sasl::Client<StartTLS<TCPConnection>>
 21
 22type Event = ();
 23type Error = std::io::Error;
 24
 25struct TCPStream {
 26    source: Box<Stream<Item=Event, Error=std::io::Error>>,
 27    sink: Arc<Box<futures::stream::SplitSink<tokio_core::io::Framed<tokio_core::net::TcpStream, xmpp_codec::XMPPCodec>>>>,
 28}
 29
 30impl TCPStream {
 31    pub fn connect(addr: &SocketAddr, handle: &Handle) -> BoxFuture<Arc<TCPStream>, std::io::Error> {
 32        TcpStream::connect(addr, handle)
 33            .and_then(|stream| {
 34                let (sink, source) = stream.framed(XMPPCodec::new())
 35                // .framed(UTF8Codec::new())
 36                    .split();
 37                
 38                sink.send(Packet::StreamStart)
 39                    .and_then(|sink| result(Ok((Arc::new(Box::new(sink)), source))))
 40            })
 41            .and_then(|(sink, source)| {
 42                let sink1 = sink.clone();
 43                let source = source
 44                    .map(|items| iter(items.into_iter().map(Ok)))
 45                    .flatten()
 46                    .filter_map(move |pkt| Self::process_packet(pkt, &sink1))
 47                // .for_each(|ev| {
 48                //     match ev {
 49                //         Packet::Stanza
 50                //         _ => (),
 51                //     }
 52                //     Ok(println!("xmpp: {:?}", ev))
 53                // })
 54                // .boxed();
 55                    ;
 56                result(Ok(Arc::new(TCPStream {
 57                    source: Box::new(source),
 58                    sink: sink,
 59                })))
 60            }).boxed()
 61            //.map_err(|e| std::io::Error::new(ErrorKind::Other, e));
 62    }
 63
 64    fn process_packet<S>(pkt: Packet, sink: &Arc<S>) -> Option<Event>
 65        where S: Sink<SinkItem=Packet, SinkError=std::io::Error> {
 66
 67        println!("pkt: {:?}", pkt);
 68        None
 69    }
 70}
 71
 72struct ClientStream {
 73    inner: TCPStream,
 74}
 75
 76impl ClientStream {
 77    pub fn connect(jid: &str, password: &str, handle: &Handle) -> Box<Future<Item=Self, Error=std::io::Error>> {
 78        let addr = "[2a01:4f8:a0:33d0::5]:5222"
 79            .to_socket_addrs().unwrap()
 80            .next().unwrap();
 81        let stream =
 82            TCPStream::connect(&addr, handle)
 83            .and_then(|stream| {
 84                Ok(ClientStream {
 85                    inner: stream
 86                })
 87            });
 88        Box::new(stream)
 89    }
 90}
 91
 92#[cfg(test)]
 93mod tests {
 94    use tokio_core::reactor::Core;
 95
 96    #[test]
 97    fn it_works() {
 98        let mut core = Core::new().unwrap();
 99        let client = super::ClientStream::connect(
100            "astro@spaceboyz.net",
101            "...",
102            &core.handle()
103        ).and_then(|stream| {
104            stream.inner.source.boxed().for_each(|item| {
105                Ok(println!("stream item: {:?}", item))
106            })
107        }).boxed();
108        core.run(client).unwrap();
109    }
110
111    // TODO: test truncated utf8
112}