impl Sink for Client + complete echo_bot

Astro created

Change summary

examples/echo_bot.rs | 114 ++++++++++++++++++++-------------------------
src/client/mod.rs    |  52 +++++++++++++++-----
src/xmpp_stream.rs   |   1 
3 files changed, 89 insertions(+), 78 deletions(-)

Detailed changes

examples/echo_bot.rs 🔗

@@ -4,84 +4,61 @@ extern crate tokio_xmpp;
 extern crate jid;
 extern crate xml;
 
-use std::str::FromStr;
 use tokio_core::reactor::Core;
 use futures::{Future, Stream, Sink, future};
 use tokio_xmpp::{Client, ClientEvent};
-use tokio_xmpp::xmpp_codec::Packet;
 
 fn main() {
     let mut core = Core::new().unwrap();
     let client = Client::new("astrobot@example.org", "", &core.handle()).unwrap();
-    // let client = TcpClient::connect(
-    //     jid.clone(),
-    //     &addr,
-    //     &core.handle()
-    // ).map_err(|e| format!("{}", e)
-    // ).and_then(|stream| {
-    //     if stream.can_starttls() {
-    //         stream.starttls()
-    //     } else {
-    //         panic!("No STARTTLS")
-    //     }
-    // }).and_then(|stream| {
-    //     let username = jid.node.as_ref().unwrap().to_owned();
-    //     stream.auth(username, password).expect("auth")
-    // }).and_then(|stream| {
-    //     stream.bind()
-    // }).and_then(|stream| {
-    //     println!("Bound to {}", stream.jid);
 
-    //     let presence = xml::Element::new("presence".to_owned(), None, vec![]);
-    //     stream.send(Packet::Stanza(presence))
-    //         .map_err(|e| format!("{}", e))
-    // }).and_then(|stream| {
-    //     let main_loop = |stream| {
-    //         stream.into_future()
-    //             .and_then(|(event, stream)| {
-    //                 stream.send(Packet::Stanza(unreachable!()))
-    //             }).and_then(main_loop)
-    //     };
-    //     main_loop(stream)
-    // }).and_then(|(event, stream)| {
-    //     let (mut sink, stream) = stream.split();
-    //     stream.for_each(move |event| {
-    //         match event {
-    //             Packet::Stanza(ref message)
-    //                 if message.name == "message" => {
-    //                     let ty = message.get_attribute("type", None);
-    //                     let body = message.get_child("body", Some("jabber:client"))
-    //                         .map(|body_el| body_el.content_str());
-    //                     match ty {
-    //                         None | Some("normal") | Some("chat")
-    //                             if body.is_some() => {
-    //                                 let from = message.get_attribute("from", None).unwrap();
-    //                                 println!("Got message from {}: {:?}", from, body);
-    //                                 let reply = make_reply(from, body.unwrap());
-    //                                 sink.send(Packet::Stanza(reply))
-    //                                     .and_then(|_| Ok(()))
-    //                             },
-    //                         _ => future::ok(()),
-    //                     }
-    //                 },
-    //             _ => future::ok(()),
-    //         }
-    //     }).map_err(|e| format!("{}", e))
-    // });
-
-    let done = client.for_each(|event| {
-        match event {
+    let (sink, stream) = client.split();
+    let mut sink = Some(sink);
+    let done = stream.for_each(move |event| {
+        let result: Box<Future<Item=(), Error=String>> = match event {
             ClientEvent::Online => {
                 println!("Online!");
+
+                let presence = make_presence();
+                sink = Some(
+                    sink.take().
+                        expect("sink")
+                        .send(presence)
+                        .wait()
+                        .expect("sink.send")
+                );
+                Box::new(
+                    future::ok(())
+                )
             },
-            ClientEvent::Stanza(stanza) => {
+            ClientEvent::Stanza(ref stanza)
+                if stanza.name == "message"
+                && stanza.get_attribute("type", None) != Some("error") =>
+            {
+                let from = stanza.get_attribute("from", None);
+                let body = stanza.get_child("body", Some("jabber:client"))
+                    .map(|el| el.content_str());
+
+                match (from.as_ref(), body) {
+                    (Some(from), Some(body)) => {
+                        let reply = make_reply(from, body);
+                        sink = Some(
+                            sink.take().
+                                expect("sink")
+                                .send(reply)
+                                .wait()
+                                .expect("sink.send")
+                        );
+                    },
+                    _ => (),
+                };
+                Box::new(future::ok(()))
             },
             _ => {
-                println!("Event: {:?}", event);
+                Box::new(future::ok(()))
             },
-        }
-        
-        Ok(())
+        };
+        result
     });
     
     match core.run(done) {
@@ -93,6 +70,15 @@ fn main() {
     }
 }
 
+fn make_presence() -> xml::Element {
+    let mut presence = xml::Element::new("presence".to_owned(), None, vec![]);
+    presence.tag(xml::Element::new("status".to_owned(), None, vec![]))
+        .text("chat".to_owned());
+    presence.tag(xml::Element::new("show".to_owned(), None, vec![]))
+        .text("Echoing messages".to_owned());
+    presence
+}
+
 fn make_reply(to: &str, body: String) -> xml::Element {
     let mut message = xml::Element::new(
         "message".to_owned(),

src/client/mod.rs 🔗

@@ -1,7 +1,7 @@
 use std::mem::replace;
 use std::str::FromStr;
 use std::error::Error;
-use tokio_core::reactor::{Core, Handle};
+use tokio_core::reactor::Handle;
 use tokio_core::net::TcpStream;
 use tokio_io::{AsyncRead, AsyncWrite};
 use tokio_tls::TlsStream;
@@ -22,7 +22,6 @@ use self::bind::*;
 
 pub struct Client {
     pub jid: Jid,
-    password: String,
     state: ClientState,
 }
 
@@ -33,8 +32,6 @@ enum ClientState {
     Disconnected,
     Connecting(Box<Future<Item=XMPPStream, Error=String>>),
     Connected(XMPPStream),
-    // Sending,
-    // Drain,
 }
 
 impl Client {
@@ -43,7 +40,7 @@ impl Client {
         let password = password.to_owned();
         let connect = Self::make_connect(jid.clone(), password.clone(), handle);
         Ok(Client {
-            jid, password,
+            jid,
             state: ClientState::Connecting(connect),
         })
     }
@@ -73,10 +70,7 @@ impl Client {
                 Self::bind(stream)
             }).and_then(|stream| {
                 println!("Bound to {}", stream.jid);
-
-                let presence = xml::Element::new("presence".to_owned(), None, vec![]);
-                stream.send(Packet::Stanza(presence))
-                    .map_err(|e| format!("{}", e))
+                Ok(stream)
             })
         )
     }
@@ -116,20 +110,18 @@ impl Stream for Client {
     type Error = String;
 
     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
-        println!("stream.poll");
         let state = replace(&mut self.state, ClientState::Invalid);
 
         match state {
             ClientState::Invalid =>
                 Err("invalid client state".to_owned()),
             ClientState::Disconnected =>
-                Ok(Async::NotReady),
+                Ok(Async::Ready(None)),
             ClientState::Connecting(mut connect) => {
                 match connect.poll() {
                     Ok(Async::Ready(stream)) => {
-                        println!("connected");
                         self.state = ClientState::Connected(stream);
-                        self.poll()
+                        Ok(Async::Ready(Some(ClientEvent::Online)))
                     },
                     Ok(Async::NotReady) => {
                         self.state = ClientState::Connecting(connect);
@@ -165,3 +157,37 @@ impl Stream for Client {
         }
     }
 }
+
+impl Sink for Client {
+    type SinkItem = xml::Element;
+    type SinkError = String;
+
+    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+        match self.state {
+            ClientState::Connected(ref mut stream) =>
+                match stream.start_send(Packet::Stanza(item)) {
+                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
+                        Ok(AsyncSink::NotReady(stanza)),
+                    Ok(AsyncSink::NotReady(_)) =>
+                        panic!("Client.start_send with stanza but got something else back"),
+                    Ok(AsyncSink::Ready) => {
+                        Ok(AsyncSink::Ready)
+                    },
+                    Err(e) =>
+                        Err(e.description().to_owned()),
+                },
+            _ =>
+                Ok(AsyncSink::NotReady(item)),
+        }
+    }
+
+    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+        match &mut self.state {
+            &mut ClientState::Connected(ref mut stream) =>
+                stream.poll_complete()
+                .map_err(|e| e.description().to_owned()),
+            _ =>
+                Ok(Async::Ready(())),
+        }
+    }
+}

src/xmpp_stream.rs 🔗

@@ -1,4 +1,3 @@
-use std::default::Default;
 use std::collections::HashMap;
 use futures::*;
 use tokio_io::{AsyncRead, AsyncWrite};