improve echo_bot example

Astro created

Change summary

examples/echo_bot.rs | 47 +++++++++++++++++++++++++--------------------
1 file changed, 26 insertions(+), 21 deletions(-)

Detailed changes

examples/echo_bot.rs 🔗

@@ -24,17 +24,32 @@ fn main() {
     // Make the two interfaces for sending and receiving independent
     // of each other so we can move one into a closure.
     let (sink, stream) = client.split();
-    let mut sink_state = Some(sink);
+
+    // Create outgoing pipe
+    let (mut tx, rx) = futures::unsync::mpsc::unbounded();
+    rt.spawn(
+        rx.forward(
+            sink.sink_map_err(|_| panic!("Pipe"))
+        )
+            .map(|(rx, mut sink)| {
+                drop(rx);
+                let _ = sink.close();
+            })
+            .map_err(|e| {
+                panic!("Send error: {:?}", e);
+            })
+    );
+
     // Main loop, processes events
+    let mut wait_for_stream_end = false;
     let done = stream.for_each(move |event| {
-        let mut sink_future = None;
-
-        if event.is_online() {
+        if wait_for_stream_end {
+            /* Do nothing */
+        } else if event.is_online() {
             println!("Online!");
 
             let presence = make_presence();
-            let sink = sink_state.take().unwrap();
-            sink_future = Some(Box::new(sink.send(Packet::Stanza(presence))));
+            tx.start_send(Packet::Stanza(presence)).unwrap();
         } else if let Some(message) = event
             .into_stanza()
             .and_then(|stanza| Message::try_from(stanza).ok())
@@ -42,31 +57,21 @@ fn main() {
             match (message.from, message.bodies.get("")) {
                 (Some(ref from), Some(ref body)) if body.0 == "die" => {
                     println!("Secret die command triggered by {}", from);
-                    let sink = sink_state.take().unwrap();
-                    sink_future = Some(Box::new(sink.send(Packet::StreamEnd)));
+                    wait_for_stream_end = true;
+                    tx.start_send(Packet::StreamEnd).unwrap();
                 }
                 (Some(ref from), Some(ref body)) => {
                     if message.type_ != MessageType::Error {
                         // This is a message we'll echo
                         let reply = make_reply(from.clone(), &body.0);
-                        let sink = sink_state.take().unwrap();
-                        sink_future = Some(Box::new(sink.send(Packet::Stanza(reply))));
+                        tx.start_send(Packet::Stanza(reply)).unwrap();
                     }
                 }
                 _ => {}
             }
-        };
+        }
 
-        sink_future
-            .map(|future| {
-                let wait_send: Box<Future<Item = (), Error = tokio_xmpp::Error>> =
-                    Box::new(future
-                             .map(|sink| {
-                                 sink_state = Some(sink);
-                             }));
-                wait_send
-            })
-            .unwrap_or_else(|| Box::new(future::ok(())))
+        future::ok(())
     });
 
     // Start polling `done`