impl stream for client

Astro created

Change summary

examples/echo_bot.rs | 124 +++++++++++++++++++++++-----------
src/client/auth.rs   |   0 
src/client/bind.rs   |   0 
src/client/mod.rs    | 167 ++++++++++++++++++++++++++++++++++++++++++++++
src/lib.rs           |   6 -
src/starttls.rs      |   1 
src/xmpp_stream.rs   |  26 -------
7 files changed, 254 insertions(+), 70 deletions(-)

Detailed changes

examples/echo_bot.rs 🔗

@@ -6,53 +6,85 @@ extern crate xml;
 
 use std::str::FromStr;
 use tokio_core::reactor::Core;
-use futures::{Future, Stream, Sink};
-use jid::Jid;
-use tokio_xmpp::TcpClient;
+use futures::{Future, Stream, Sink, future};
+use tokio_xmpp::{Client, ClientEvent};
 use tokio_xmpp::xmpp_codec::Packet;
 
 fn main() {
-    let jid = Jid::from_str("astrobot@example.net").expect("JID");
-    let password = "".to_owned();
+    let mut core = Core::new().unwrap();
+    let client = Client::new("astrobot@example.org", "", &core.handle()).unwrap();
+    // let client = TcpClient::connect(
+    //     jid.clone(),
+    //     &addr,
+    //     &core.handle()
+    // ).map_err(|e| format!("{}", e)
+    // ).and_then(|stream| {
+    //     if stream.can_starttls() {
+    //         stream.starttls()
+    //     } else {
+    //         panic!("No STARTTLS")
+    //     }
+    // }).and_then(|stream| {
+    //     let username = jid.node.as_ref().unwrap().to_owned();
+    //     stream.auth(username, password).expect("auth")
+    // }).and_then(|stream| {
+    //     stream.bind()
+    // }).and_then(|stream| {
+    //     println!("Bound to {}", stream.jid);
 
-    use std::net::ToSocketAddrs;
-    let addr = "[2a01:4f8:a0:33d0::5]:5222"
-        .to_socket_addrs().unwrap()
-        .next().unwrap();
+    //     let presence = xml::Element::new("presence".to_owned(), None, vec![]);
+    //     stream.send(Packet::Stanza(presence))
+    //         .map_err(|e| format!("{}", e))
+    // }).and_then(|stream| {
+    //     let main_loop = |stream| {
+    //         stream.into_future()
+    //             .and_then(|(event, stream)| {
+    //                 stream.send(Packet::Stanza(unreachable!()))
+    //             }).and_then(main_loop)
+    //     };
+    //     main_loop(stream)
+    // }).and_then(|(event, stream)| {
+    //     let (mut sink, stream) = stream.split();
+    //     stream.for_each(move |event| {
+    //         match event {
+    //             Packet::Stanza(ref message)
+    //                 if message.name == "message" => {
+    //                     let ty = message.get_attribute("type", None);
+    //                     let body = message.get_child("body", Some("jabber:client"))
+    //                         .map(|body_el| body_el.content_str());
+    //                     match ty {
+    //                         None | Some("normal") | Some("chat")
+    //                             if body.is_some() => {
+    //                                 let from = message.get_attribute("from", None).unwrap();
+    //                                 println!("Got message from {}: {:?}", from, body);
+    //                                 let reply = make_reply(from, body.unwrap());
+    //                                 sink.send(Packet::Stanza(reply))
+    //                                     .and_then(|_| Ok(()))
+    //                             },
+    //                         _ => future::ok(()),
+    //                     }
+    //                 },
+    //             _ => future::ok(()),
+    //         }
+    //     }).map_err(|e| format!("{}", e))
+    // });
 
-    let mut core = Core::new().unwrap();
-    let client = TcpClient::connect(
-        jid.clone(),
-        &addr,
-        &core.handle()
-    ).map_err(|e| format!("{}", e)
-    ).and_then(|stream| {
-        if stream.can_starttls() {
-            stream.starttls()
-        } else {
-            panic!("No STARTTLS")
+    let done = client.for_each(|event| {
+        match event {
+            ClientEvent::Online => {
+                println!("Online!");
+            },
+            ClientEvent::Stanza(stanza) => {
+            },
+            _ => {
+                println!("Event: {:?}", event);
+            },
         }
-    }).and_then(|stream| {
-        let username = jid.node.as_ref().unwrap().to_owned();
-        stream.auth(username, password).expect("auth")
-    }).and_then(|stream| {
-        stream.bind()
-    }).and_then(|stream| {
-        println!("Bound to {}", stream.jid);
-
-        let presence = xml::Element::new("presence".to_owned(), None, vec![]);
-        stream.send(Packet::Stanza(presence))
-            .map_err(|e| format!("{}", e))
-    }).and_then(|stream| {
-        stream.for_each(|event| {
-            match event {
-                Packet::Stanza(el) => println!("<< {}", el),
-                _ => println!("!! {:?}", event),
-            }
-            Ok(())
-        }).map_err(|e| format!("{}", e))
+        
+        Ok(())
     });
-    match core.run(client) {
+    
+    match core.run(done) {
         Ok(_) => (),
         Err(e) => {
             println!("Fatal: {}", e);
@@ -60,3 +92,15 @@ fn main() {
         }
     }
 }
+
+fn make_reply(to: &str, body: String) -> xml::Element {
+    let mut message = xml::Element::new(
+        "message".to_owned(),
+        None,
+        vec![("type".to_owned(), None, "chat".to_owned()),
+             ("to".to_owned(), None, to.to_owned())]
+    );
+    message.tag(xml::Element::new("body".to_owned(), None, vec![]))
+        .text(body);
+    message
+}

src/client/mod.rs 🔗

@@ -0,0 +1,167 @@
+use std::mem::replace;
+use std::str::FromStr;
+use std::error::Error;
+use tokio_core::reactor::{Core, Handle};
+use tokio_core::net::TcpStream;
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_tls::TlsStream;
+use futures::*;
+use jid::{Jid, JidParseError};
+use xml;
+use sasl::common::{Credentials, ChannelBinding};
+
+use super::xmpp_codec::Packet;
+use super::xmpp_stream;
+use super::tcp::TcpClient;
+use super::starttls::{NS_XMPP_TLS, StartTlsClient};
+
+mod auth;
+use self::auth::*;
+mod bind;
+use self::bind::*;
+
+pub struct Client {
+    pub jid: Jid,
+    password: String,
+    state: ClientState,
+}
+
+type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
+
+enum ClientState {
+    Invalid,
+    Disconnected,
+    Connecting(Box<Future<Item=XMPPStream, Error=String>>),
+    Connected(XMPPStream),
+    // Sending,
+    // Drain,
+}
+
+impl Client {
+    pub fn new(jid: &str, password: &str, handle: &Handle) -> Result<Self, JidParseError> {
+        let jid = try!(Jid::from_str(jid));
+        let password = password.to_owned();
+        let connect = Self::make_connect(jid.clone(), password.clone(), handle);
+        Ok(Client {
+            jid, password,
+            state: ClientState::Connecting(connect),
+        })
+    }
+
+    fn make_connect(jid: Jid, password: String, handle: &Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
+        use std::net::ToSocketAddrs;
+        let addr = "89.238.79.220:5222"
+            .to_socket_addrs().unwrap()
+            .next().unwrap();
+        let username = jid.node.as_ref().unwrap().to_owned();
+        let password = password;
+        Box::new(
+            TcpClient::connect(
+                jid,
+                &addr,
+                handle
+            ).map_err(|e| format!("{}", e)
+            ).and_then(|stream| {
+                if Self::can_starttls(&stream) {
+                    Self::starttls(stream)
+                } else {
+                    panic!("No STARTTLS")
+                }
+            }).and_then(move |stream| {
+                Self::auth(stream, username, password).expect("auth")
+            }).and_then(|stream| {
+                Self::bind(stream)
+            }).and_then(|stream| {
+                println!("Bound to {}", stream.jid);
+
+                let presence = xml::Element::new("presence".to_owned(), None, vec![]);
+                stream.send(Packet::Stanza(presence))
+                    .map_err(|e| format!("{}", e))
+            })
+        )
+    }
+
+    fn can_starttls<S>(stream: &xmpp_stream::XMPPStream<S>) -> bool {
+        stream.stream_features
+            .get_child("starttls", Some(NS_XMPP_TLS))
+            .is_some()
+    }
+
+    fn starttls<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> StartTlsClient<S> {
+        StartTlsClient::from_stream(stream)
+    }
+
+    fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, username: String, password: String) -> Result<ClientAuth<S>, String> {
+        let creds = Credentials::default()
+            .with_username(username)
+            .with_password(password)
+            .with_channel_binding(ChannelBinding::None);
+        ClientAuth::new(stream, creds)
+    }
+
+    fn bind<S: AsyncWrite>(stream: xmpp_stream::XMPPStream<S>) -> ClientBind<S> {
+        ClientBind::new(stream)
+    }
+}
+
+#[derive(Debug)]
+pub enum ClientEvent {
+    Online,
+    Disconnected,
+    Stanza(xml::Element),
+}
+
+impl Stream for Client {
+    type Item = ClientEvent;
+    type Error = String;
+
+    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+        println!("stream.poll");
+        let state = replace(&mut self.state, ClientState::Invalid);
+
+        match state {
+            ClientState::Invalid =>
+                Err("invalid client state".to_owned()),
+            ClientState::Disconnected =>
+                Ok(Async::NotReady),
+            ClientState::Connecting(mut connect) => {
+                match connect.poll() {
+                    Ok(Async::Ready(stream)) => {
+                        println!("connected");
+                        self.state = ClientState::Connected(stream);
+                        self.poll()
+                    },
+                    Ok(Async::NotReady) => {
+                        self.state = ClientState::Connecting(connect);
+                        Ok(Async::NotReady)
+                    },
+                    Err(e) =>
+                        Err(e),
+                }
+            },
+            ClientState::Connected(mut stream) => {
+                match stream.poll() {
+                    Ok(Async::NotReady) => {
+                        self.state = ClientState::Connected(stream);
+                        Ok(Async::NotReady)
+                    },
+                    Ok(Async::Ready(None)) => {
+                        // EOF
+                        self.state = ClientState::Disconnected;
+                        Ok(Async::Ready(Some(ClientEvent::Disconnected)))
+                    },
+                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
+                        self.state = ClientState::Connected(stream);
+                        Ok(Async::Ready(Some(ClientEvent::Stanza(stanza))))
+                    },
+                    Ok(Async::Ready(_)) => {
+                        self.state = ClientState::Connected(stream);
+                        Ok(Async::NotReady)
+                    },
+                    Err(e) =>
+                        Err(e.description().to_owned()),
+                }
+            },
+        }
+    }
+}

src/lib.rs 🔗

@@ -18,10 +18,8 @@ mod tcp;
 pub use tcp::*;
 mod starttls;
 pub use starttls::*;
-mod client_auth;
-pub use client_auth::*;
-mod client_bind;
-pub use client_bind::*;
+mod client;
+pub use client::{Client, ClientEvent};
 
 
 // type FullClient = sasl::Client<StartTLS<TCPConnection>>

src/starttls.rs 🔗

@@ -15,6 +15,7 @@ use stream_start::StreamStart;
 
 pub const NS_XMPP_TLS: &str = "urn:ietf:params:xml:ns:xmpp-tls";
 
+
 pub struct StartTlsClient<S: AsyncRead + AsyncWrite> {
     state: StartTlsClientState<S>,
     jid: Jid,

src/xmpp_stream.rs 🔗

@@ -4,14 +4,10 @@ use futures::*;
 use tokio_io::{AsyncRead, AsyncWrite};
 use tokio_io::codec::Framed;
 use xml;
-use sasl::common::{Credentials, ChannelBinding};
 use jid::Jid;
 
 use xmpp_codec::*;
 use stream_start::*;
-use starttls::{NS_XMPP_TLS, StartTlsClient};
-use client_auth::ClientAuth;
-use client_bind::ClientBind;
 
 pub const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
 
@@ -42,28 +38,6 @@ impl<S: AsyncRead + AsyncWrite> XMPPStream<S> {
     pub fn restart(self) -> StreamStart<S> {
         Self::from_stream(self.stream.into_inner(), self.jid)
     }
-
-    pub fn can_starttls(&self) -> bool {
-        self.stream_features
-            .get_child("starttls", Some(NS_XMPP_TLS))
-            .is_some()
-    }
-
-    pub fn starttls(self) -> StartTlsClient<S> {
-        StartTlsClient::from_stream(self)
-    }
-
-    pub fn auth(self, username: String, password: String) -> Result<ClientAuth<S>, String> {
-        let creds = Credentials::default()
-            .with_username(username)
-            .with_password(password)
-            .with_channel_binding(ChannelBinding::None);
-        ClientAuth::new(self, creds)
-    }
-
-    pub fn bind(self) -> ClientBind<S> {
-        ClientBind::new(self)
-    }
 }
 
 /// Proxy to self.stream