this kinda works

Astro created

Change summary

Cargo.toml        |   6 +-
src/lib.rs        | 147 +++++++++++++++++++++++++++---------------------
src/xmpp_codec.rs |   5 +
3 files changed, 89 insertions(+), 69 deletions(-)

Detailed changes

Cargo.toml 🔗

@@ -4,6 +4,6 @@ version = "0.1.0"
 authors = ["Astro <astro@spaceboyz.net>"]
 
 [dependencies]
-futures = "0.1.6"
-tokio-core = "0.1.1"
-RustyXML = "0.1.1"
+futures = "*"
+tokio-core = "*"
+RustyXML = "*"

src/lib.rs 🔗

@@ -1,3 +1,4 @@
+#[macro_use]
 extern crate futures;
 extern crate tokio_core;
 extern crate xml;
@@ -6,12 +7,12 @@ use std::net::SocketAddr;
 use std::net::ToSocketAddrs;
 use std::sync::Arc;
 use std::io::ErrorKind;
-use futures::{Future, BoxFuture, Sink, Poll};
+use futures::{Future, BoxFuture, Sink, Poll, Async};
 use futures::stream::{Stream, iter};
 use futures::future::result;
 use tokio_core::reactor::Handle;
 use tokio_core::io::Io;
-use tokio_core::net::TcpStream;
+use tokio_core::net::{TcpStream, TcpStreamNew};
 
 mod xmpp_codec;
 use xmpp_codec::*;
@@ -19,92 +20,108 @@ use xmpp_codec::*;
 
 // type FullClient = sasl::Client<StartTLS<TCPConnection>>
 
-type Event = ();
-type Error = std::io::Error;
+#[derive(Debug)]
+pub struct TcpClient {
+    state: TcpClientState,
+}
 
-struct TCPStream {
-    source: Box<Stream<Item=Event, Error=std::io::Error>>,
-    sink: Arc<Box<futures::stream::SplitSink<tokio_core::io::Framed<tokio_core::net::TcpStream, xmpp_codec::XMPPCodec>>>>,
+enum TcpClientState {
+    Connecting(TcpStreamNew),
+    SendStart(futures::sink::Send<XMPPStream<TcpStream>>),
+    RecvStart(Option<XMPPStream<TcpStream>>),
+    Established,
+    Invalid,
 }
 
-impl TCPStream {
-    pub fn connect(addr: &SocketAddr, handle: &Handle) -> BoxFuture<Arc<TCPStream>, std::io::Error> {
-        TcpStream::connect(addr, handle)
-            .and_then(|stream| {
-                let (sink, source) = stream.framed(XMPPCodec::new())
-                // .framed(UTF8Codec::new())
-                    .split();
-                
-                sink.send(Packet::StreamStart)
-                    .and_then(|sink| result(Ok((Arc::new(Box::new(sink)), source))))
-            })
-            .and_then(|(sink, source)| {
-                let sink1 = sink.clone();
-                let source = source
-                    .map(|items| iter(items.into_iter().map(Ok)))
-                    .flatten()
-                    .filter_map(move |pkt| Self::process_packet(pkt, &sink1))
-                // .for_each(|ev| {
-                //     match ev {
-                //         Packet::Stanza
-                //         _ => (),
-                //     }
-                //     Ok(println!("xmpp: {:?}", ev))
-                // })
-                // .boxed();
-                    ;
-                result(Ok(Arc::new(TCPStream {
-                    source: Box::new(source),
-                    sink: sink,
-                })))
-            }).boxed()
-            //.map_err(|e| std::io::Error::new(ErrorKind::Other, e));
+impl std::fmt::Debug for TcpClientState {
+    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
+        let s = match *self {
+            TcpClientState::Connecting(_) => "Connecting",
+            TcpClientState::SendStart(_) => "SendStart",
+            TcpClientState::RecvStart(_) => "RecvStart",
+            TcpClientState::Established => "Established",
+            TcpClientState::Invalid => "Invalid",
+        };
+        write!(fmt, "{}", s)
     }
+}
 
-    fn process_packet<S>(pkt: Packet, sink: &Arc<S>) -> Option<Event>
-        where S: Sink<SinkItem=Packet, SinkError=std::io::Error> {
-
-        println!("pkt: {:?}", pkt);
-        None
+impl TcpClient {
+    pub fn connect(addr: &SocketAddr, handle: &Handle) -> Self {
+        let tcp_stream_new = TcpStream::connect(addr, handle);
+        TcpClient {
+            state: TcpClientState::Connecting(tcp_stream_new),
+        }
     }
 }
 
-struct ClientStream {
-    inner: TCPStream,
-}
+impl Future for TcpClient {
+    type Item = XMPPStream<TcpStream>;
+    type Error = std::io::Error;
 
-impl ClientStream {
-    pub fn connect(jid: &str, password: &str, handle: &Handle) -> Box<Future<Item=Self, Error=std::io::Error>> {
-        let addr = "[2a01:4f8:a0:33d0::5]:5222"
-            .to_socket_addrs().unwrap()
-            .next().unwrap();
-        let stream =
-            TCPStream::connect(&addr, handle)
-            .and_then(|stream| {
-                Ok(ClientStream {
-                    inner: stream
-                })
-            });
-        Box::new(stream)
+    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        let (new_state, result) = match self.state {
+            TcpClientState::Connecting(ref mut tcp_stream_new) => {
+                let tcp_stream = try_ready!(tcp_stream_new.poll());
+                let xmpp_stream = tcp_stream.framed(XMPPCodec::new());
+                let send = xmpp_stream.send(Packet::StreamStart);
+                let new_state = TcpClientState::SendStart(send);
+                (new_state, Ok(Async::NotReady))
+            },
+            TcpClientState::SendStart(ref mut send) => {
+                let xmpp_stream = try_ready!(send.poll());
+                let new_state = TcpClientState::RecvStart(Some(xmpp_stream));
+                (new_state, Ok(Async::NotReady))
+            },
+            TcpClientState::RecvStart(ref mut opt_xmpp_stream) => {
+                let mut xmpp_stream = opt_xmpp_stream.take().unwrap();
+                match xmpp_stream.poll() {
+                    Ok(Async::Ready(Some(events))) => println!("Recv start: {:?}", events),
+                    Ok(Async::Ready(_)) => return Err(std::io::Error::from(ErrorKind::InvalidData)),
+                    Ok(Async::NotReady) => {
+                        *opt_xmpp_stream = Some(xmpp_stream);
+                        return Ok(Async::NotReady);
+                    },
+                    Err(e) => return Err(e)
+                };
+                let new_state = TcpClientState::Established;
+                (new_state, Ok(Async::Ready(xmpp_stream)))
+            },
+            TcpClientState::Established | TcpClientState::Invalid =>
+                unreachable!(),
+        };
+
+        println!("Next state: {:?}", new_state);
+        self.state = new_state;
+	match result {
+	    // by polling again, we register new future
+	    Ok(Async::NotReady) => self.poll(),
+	    result => result
+	}
     }
 }
 
 #[cfg(test)]
 mod tests {
     use tokio_core::reactor::Core;
+    use futures::{Future, Stream};
 
     #[test]
     fn it_works() {
+        use std::net::ToSocketAddrs;
+        let addr = "[2a01:4f8:a0:33d0::5]:5222"
+            .to_socket_addrs().unwrap()
+            .next().unwrap();
+
         let mut core = Core::new().unwrap();
-        let client = super::ClientStream::connect(
-            "astro@spaceboyz.net",
-            "...",
+        let client = super::TcpClient::connect(
+            &addr,
             &core.handle()
         ).and_then(|stream| {
-            stream.inner.source.boxed().for_each(|item| {
+            stream.for_each(|item| {
                 Ok(println!("stream item: {:?}", item))
             })
-        }).boxed();
+        });
         core.run(client).unwrap();
     }
 

src/xmpp_codec.rs 🔗

@@ -2,7 +2,7 @@ use std;
 use std::str::from_utf8;
 use std::io::{Error, ErrorKind};
 use std::collections::HashMap;
-use tokio_core::io::{Codec, EasyBuf};
+use tokio_core::io::{Codec, EasyBuf, Framed};
 use xml;
 
 const NS_XMLNS: &'static str = "http://www.w3.org/2000/xmlns/";
@@ -51,6 +51,8 @@ pub enum Packet {
     StreamEnd,
 }
 
+pub type XMPPStream<T> = Framed<T, XMPPCodec>;
+
 pub struct XMPPCodec {
     parser: xml::Parser,
     root: Option<XMPPRoot>,
@@ -70,6 +72,7 @@ impl Codec for XMPPCodec {
     type Out = Packet;
 
     fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, Error> {
+        println!("XMPPCodec.decode {:?}", buf.len());
         match from_utf8(buf.as_slice()) {
             Ok(s) =>
                 self.parser.feed_str(s),