client: stream.poll_complete() for ease of use

Astro created

Change summary

examples/echo_bot.rs       | 13 +++----------
examples/echo_component.rs | 13 +++----------
src/client/mod.rs          |  9 +++++++++
src/component/mod.rs       |  9 +++++++++
4 files changed, 24 insertions(+), 20 deletions(-)

Detailed changes

examples/echo_bot.rs 🔗

@@ -10,7 +10,7 @@ use std::env::args;
 use std::process::exit;
 use try_from::TryFrom;
 use tokio_core::reactor::Core;
-use futures::{Future, Stream, Sink, future};
+use futures::{Stream, Sink, future};
 use tokio_xmpp::Client;
 use minidom::Element;
 use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow};
@@ -33,18 +33,11 @@ fn main() {
 
     // Make the two interfaces for sending and receiving independent
     // of each other so we can move one into a closure.
-    let (sink, stream) = client.split();
+    let (mut sink, stream) = client.split();
     // Wrap sink in Option so that we can take() it for the send(self)
     // to consume and return it back when ready.
-    let mut sink = Some(sink);
     let mut send = move |stanza| {
-        sink = Some(
-            sink.take().
-                expect("sink")
-                .send(stanza)
-                .wait()
-                .expect("sink.send")
-        );
+        sink.start_send(stanza).expect("start_send");
     };
     // Main loop, processes events
     let done = stream.for_each(|event| {

examples/echo_component.rs 🔗

@@ -11,7 +11,7 @@ use std::process::exit;
 use std::str::FromStr;
 use try_from::TryFrom;
 use tokio_core::reactor::Core;
-use futures::{Future, Stream, Sink, future};
+use futures::{Stream, Sink, future};
 use tokio_xmpp::Component;
 use minidom::Element;
 use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow};
@@ -38,18 +38,11 @@ fn main() {
     // Make the two interfaces for sending and receiving independent
     // of each other so we can move one into a closure.
     println!("Got it: {}", component.jid);
-    let (sink, stream) = component.split();
+    let (mut sink, stream) = component.split();
     // Wrap sink in Option so that we can take() it for the send(self)
     // to consume and return it back when ready.
-    let mut sink = Some(sink);
     let mut send = move |stanza| {
-        sink = Some(
-            sink.take().
-                expect("sink")
-                .send(stanza)
-                .wait()
-                .expect("sink.send")
-        );
+        sink.start_send(stanza).expect("start_send");
     };
     // Main loop, processes events
     let done = stream.for_each(|event| {

src/client/mod.rs 🔗

@@ -137,6 +137,15 @@ impl Stream for Client {
                 }
             },
             ClientState::Connected(mut stream) => {
+                // Poll sink
+                match stream.poll_complete() {
+                    Ok(Async::NotReady) => (),
+                    Ok(Async::Ready(())) => (),
+                    Err(e) =>
+                        return Err(e.description().to_owned()),
+                };
+
+                // Poll stream
                 match stream.poll() {
                     Ok(Async::Ready(None)) => {
                         // EOF

src/component/mod.rs 🔗

@@ -92,6 +92,15 @@ impl Stream for Component {
                 }
             },
             ComponentState::Connected(mut stream) => {
+                // Poll sink
+                match stream.poll_complete() {
+                    Ok(Async::NotReady) => (),
+                    Ok(Async::Ready(())) => (),
+                    Err(e) =>
+                        return Err(e.description().to_owned()),
+                };
+
+                // Poll stream
                 match stream.poll() {
                     Ok(Async::NotReady) => {
                         self.state = ComponentState::Connected(stream);