move stream_start out of places

Astro created

Change summary

src/client/mod.rs   | 31 ++++++++++++--------
src/lib.rs          |  3 --
src/starttls.rs     | 22 ++-------------
src/stream_start.rs | 30 +++++++++++--------
src/tcp.rs          | 68 -----------------------------------------------
src/xmpp_codec.rs   |  4 +-
src/xmpp_stream.rs  | 13 ++++----
7 files changed, 46 insertions(+), 125 deletions(-)

Detailed changes

src/client/mod.rs 🔗

@@ -12,7 +12,6 @@ use sasl::common::{Credentials, ChannelBinding};
 
 use super::xmpp_codec::Packet;
 use super::xmpp_stream;
-use super::tcp::TcpClient;
 use super::starttls::{NS_XMPP_TLS, StartTlsClient};
 use super::happy_eyeballs::Connecter;
 
@@ -29,6 +28,7 @@ pub struct Client {
 }
 
 type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
+const NS_JABBER_CLIENT: &str = "jabber:client";
 
 enum ClientState {
     Invalid,
@@ -50,26 +50,31 @@ impl Client {
 
     fn make_connect(jid: Jid, password: String, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
         let username = jid.node.as_ref().unwrap().to_owned();
+        let jid1 = jid.clone();
+        let jid2 = jid.clone();
         let password = password;
         Box::new(
             Connecter::from_lookup(handle, &jid.domain, "_xmpp-client._tcp", 5222)
                 .expect("Connector::from_lookup")
-                .and_then(|tcp_stream|
-                          TcpClient::from_stream(jid, tcp_stream)
+                .and_then(move |tcp_stream|
+                          xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_CLIENT.to_owned())
                           .map_err(|e| format!("{}", e))
-                ).and_then(|stream| {
-                    if Self::can_starttls(&stream) {
-                        Self::starttls(stream)
+                ).and_then(|xmpp_stream| {
+                    if Self::can_starttls(&xmpp_stream) {
+                        Self::starttls(xmpp_stream)
                     } else {
                         panic!("No STARTTLS")
                     }
-                }).and_then(move |stream| {
-                    Self::auth(stream, username, password).expect("auth")
-                }).and_then(|stream| {
-                    Self::bind(stream)
-                }).and_then(|stream| {
-                    println!("Bound to {}", stream.jid);
-                    Ok(stream)
+                }).and_then(|tls_stream|
+                          XMPPStream::start(tls_stream, jid2, NS_JABBER_CLIENT.to_owned())
+                          .map_err(|e| format!("{}", e))
+                ).and_then(move |xmpp_stream| {
+                    Self::auth(xmpp_stream, username, password).expect("auth")
+                }).and_then(|xmpp_stream| {
+                    Self::bind(xmpp_stream)
+                }).and_then(|xmpp_stream| {
+                    println!("Bound to {}", xmpp_stream.jid);
+                    Ok(xmpp_stream)
                 })
         )
     }

src/lib.rs 🔗

@@ -1,4 +1,3 @@
-#[macro_use]
 extern crate futures;
 extern crate tokio_core;
 extern crate tokio_io;
@@ -16,8 +15,6 @@ extern crate domain;
 pub mod xmpp_codec;
 pub mod xmpp_stream;
 mod stream_start;
-mod tcp;
-pub use tcp::TcpClient;
 mod starttls;
 pub use starttls::StartTlsClient;
 mod happy_eyeballs;

src/starttls.rs 🔗

@@ -10,7 +10,6 @@ use jid::Jid;
 
 use xmpp_codec::Packet;
 use xmpp_stream::XMPPStream;
-use stream_start::StreamStart;
 
 
 pub const NS_XMPP_TLS: &str = "urn:ietf:params:xml:ns:xmpp-tls";
@@ -26,7 +25,6 @@ enum StartTlsClientState<S: AsyncRead + AsyncWrite> {
     SendStartTls(sink::Send<XMPPStream<S>>),
     AwaitProceed(XMPPStream<S>),
     StartingTls(ConnectAsync<S>),
-    Start(StreamStart<TlsStream<S>>),
 }
 
 impl<S: AsyncRead + AsyncWrite> StartTlsClient<S> {
@@ -48,7 +46,7 @@ impl<S: AsyncRead + AsyncWrite> StartTlsClient<S> {
 }
 
 impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
-    type Item = XMPPStream<TlsStream<S>>;
+    type Item = TlsStream<S>;
     type Error = String;
 
     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -92,24 +90,10 @@ impl<S: AsyncRead + AsyncWrite> Future for StartTlsClient<S> {
                 },
             StartTlsClientState::StartingTls(mut connect) =>
                 match connect.poll() {
-                    Ok(Async::Ready(tls_stream)) => {
-                        println!("TLS stream established");
-                        let start = XMPPStream::from_stream(tls_stream, self.jid.clone());
-                        let new_state = StartTlsClientState::Start(start);
-                        retry = true;
-                        (new_state, Ok(Async::NotReady))
-                    },
+                    Ok(Async::Ready(tls_stream)) =>
+                        (StartTlsClientState::Invalid, Ok(Async::Ready(tls_stream))),
                     Ok(Async::NotReady) =>
                         (StartTlsClientState::StartingTls(connect), Ok(Async::NotReady)),
-                    Err(e) =>
-                        (StartTlsClientState::StartingTls(connect),  Err(format!("{}", e))),
-                },
-            StartTlsClientState::Start(mut start) =>
-                match start.poll() {
-                    Ok(Async::Ready(xmpp_stream)) =>
-                        (StartTlsClientState::Invalid, Ok(Async::Ready(xmpp_stream))),
-                    Ok(Async::NotReady) =>
-                        (StartTlsClientState::Start(start), Ok(Async::NotReady)),
                     Err(e) =>
                         (StartTlsClientState::Invalid, Err(format!("{}", e))),
                 },

src/stream_start.rs 🔗

@@ -1,6 +1,5 @@
 use std::mem::replace;
 use std::io::{Error, ErrorKind};
-use std::collections::HashMap;
 use futures::{Future, Async, Poll, Stream, sink, Sink};
 use tokio_io::{AsyncRead, AsyncWrite};
 use tokio_io::codec::Framed;
@@ -14,20 +13,21 @@ const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
 pub struct StreamStart<S: AsyncWrite> {
     state: StreamStartState<S>,
     jid: Jid,
+    ns: String,
 }
 
 enum StreamStartState<S: AsyncWrite> {
     SendStart(sink::Send<Framed<S, XMPPCodec>>),
     RecvStart(Framed<S, XMPPCodec>),
-    RecvFeatures(Framed<S, XMPPCodec>, HashMap<String, String>),
+    RecvFeatures(Framed<S, XMPPCodec>, String),
     Invalid,
 }
 
 impl<S: AsyncWrite> StreamStart<S> {
-    pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid) -> Self {
+    pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid, ns: String) -> Self {
         let attrs = [("to".to_owned(), jid.domain.clone()),
                      ("version".to_owned(), "1.0".to_owned()),
-                     ("xmlns".to_owned(), "jabber:client".to_owned()),
+                     ("xmlns".to_owned(), ns.clone()),
                      ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
         ].iter().cloned().collect();
         let send = stream.send(Packet::StreamStart(attrs));
@@ -35,6 +35,7 @@ impl<S: AsyncWrite> StreamStart<S> {
         StreamStart {
             state: StreamStartState::SendStart(send),
             jid,
+            ns,
         }
     }
 }
@@ -63,8 +64,13 @@ impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
                 match stream.poll() {
                     Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
                         retry = true;
+                        let stream_ns = match stream_attrs.get("xmlns") {
+                            Some(ns) => ns.clone(),
+                            None =>
+                                return Err(Error::from(ErrorKind::InvalidData)),
+                        };
                         // TODO: skip RecvFeatures for version < 1.0
-                        (StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady))
+                        (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
                     },
                     Ok(Async::Ready(_)) =>
                         return Err(Error::from(ErrorKind::InvalidData)),
@@ -73,22 +79,20 @@ impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
                     Err(e) =>
                         return Err(e),
                 },
-            StreamStartState::RecvFeatures(mut stream, stream_attrs) =>
+            StreamStartState::RecvFeatures(mut stream, stream_ns) =>
                 match stream.poll() {
                     Ok(Async::Ready(Some(Packet::Stanza(stanza)))) =>
                         if stanza.name() == "features"
                         && stanza.ns() == Some(NS_XMPP_STREAM) {
-                            let stream = XMPPStream::new(self.jid.clone(), stream, stream_attrs, stanza);
+                            let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza);
                             (StreamStartState::Invalid, Ok(Async::Ready(stream)))
                         } else {
-                            (StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady))
+                            (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
                         },
-                    Ok(Async::Ready(item)) => {
-                        println!("StreamStart skip {:?}", item);
-                        (StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady))
-                    },
+                    Ok(Async::Ready(item)) =>
+                        (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)),
                     Ok(Async::NotReady) =>
-                        (StreamStartState::RecvFeatures(stream, stream_attrs), Ok(Async::NotReady)),
+                        (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)),
                     Err(e) =>
                         return Err(e),
                 },

src/tcp.rs 🔗

@@ -1,68 +0,0 @@
-use std::net::SocketAddr;
-use std::io::Error;
-use futures::{Future, Poll, Async};
-use tokio_core::reactor::Handle;
-use tokio_core::net::{TcpStream, TcpStreamNew};
-use jid::Jid;
-
-use xmpp_stream::XMPPStream;
-use stream_start::StreamStart;
-
-pub struct TcpClient {
-    state: TcpClientState,
-    jid: Jid,
-}
-
-enum TcpClientState {
-    Connecting(TcpStreamNew),
-    Start(StreamStart<TcpStream>),
-    Established,
-}
-
-impl TcpClient {
-    pub fn connect(jid: Jid, addr: &SocketAddr, handle: &Handle) -> Self {
-        let tcp_stream_new = TcpStream::connect(addr, handle);
-        TcpClient {
-            state: TcpClientState::Connecting(tcp_stream_new),
-            jid,
-        }
-    }
-
-    pub fn from_stream(jid: Jid, tcp_stream: TcpStream) -> Self {
-        let start = XMPPStream::from_stream(tcp_stream, jid.clone());
-        TcpClient {
-            state: TcpClientState::Start(start),
-            jid,
-        }
-    }
-}
-
-impl Future for TcpClient {
-    type Item = XMPPStream<TcpStream>;
-    type Error = Error;
-
-    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
-        let (new_state, result) = match self.state {
-            TcpClientState::Connecting(ref mut tcp_stream_new) => {
-                let tcp_stream = try_ready!(tcp_stream_new.poll());
-                let start = XMPPStream::from_stream(tcp_stream, self.jid.clone());
-                let new_state = TcpClientState::Start(start);
-                (new_state, Ok(Async::NotReady))
-            },
-            TcpClientState::Start(ref mut start) => {
-                let xmpp_stream = try_ready!(start.poll());
-                let new_state = TcpClientState::Established;
-                (new_state, Ok(Async::Ready(xmpp_stream)))
-            },
-            TcpClientState::Established =>
-                unreachable!(),
-        };
-
-        self.state = new_state;
-        match result {
-            // by polling again, we register new future
-            Ok(Async::NotReady) => self.poll(),
-            result => result
-        }
-    }
-}

src/xmpp_codec.rs 🔗

@@ -98,8 +98,8 @@ impl ParserSink {
 
         if self.stack.is_empty() {
             let attrs = HashMap::from_iter(
-                el.attrs()
-                    .map(|(name, value)| (name.to_owned(), value.to_owned()))
+                tag.attrs.iter()
+                    .map(|attr| (attr.name.local.as_ref().to_owned(), attr.value.as_ref().to_owned()))
             );
             self.push_queue(Packet::StreamStart(attrs));
         }

src/xmpp_stream.rs 🔗

@@ -1,4 +1,3 @@
-use std::collections::HashMap;
 use futures::{Poll, Stream, Sink, StartSend};
 use tokio_io::{AsyncRead, AsyncWrite};
 use tokio_io::codec::Framed;
@@ -13,21 +12,21 @@ pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
 pub struct XMPPStream<S> {
     pub jid: Jid,
     pub stream: Framed<S, XMPPCodec>,
-    pub stream_attrs: HashMap<String, String>,
     pub stream_features: Element,
+    pub ns: String,
 }
 
 impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
     pub fn new(jid: Jid,
                stream: Framed<S, XMPPCodec>,
-               stream_attrs: HashMap<String, String>,
+               ns: String,
                stream_features: Element) -> Self {
-        XMPPStream { jid, stream, stream_attrs, stream_features }
+        XMPPStream { jid, stream, stream_features, ns }
     }
 
-    pub fn from_stream(stream: S, jid: Jid) -> StreamStart<S> {
+    pub fn start(stream: S, jid: Jid, ns: String) -> StreamStart<S> {
         let xmpp_stream = AsyncRead::framed(stream, XMPPCodec::new());
-        StreamStart::from_stream(xmpp_stream, jid)
+        StreamStart::from_stream(xmpp_stream, jid, ns)
     }
 
     pub fn into_inner(self) -> S {
@@ -35,7 +34,7 @@ impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
     }
 
     pub fn restart(self) -> StreamStart<S> {
-        Self::from_stream(self.stream.into_inner(), self.jid)
+        Self::start(self.stream.into_inner(), self.jid, self.ns)
     }
 }