client: switch SinkItem to Packet

Astro created

this breaks backwards compatibility

Change summary

examples/echo_bot.rs | 10 +++++-----
src/client/mod.rs    | 19 ++++---------------
src/lib.rs           |  1 +
3 files changed, 10 insertions(+), 20 deletions(-)

Detailed changes

examples/echo_bot.rs 🔗

@@ -2,7 +2,7 @@ use futures::{future, Future, Sink, Stream};
 use std::env::args;
 use std::process::exit;
 use tokio::runtime::current_thread::Runtime;
-use tokio_xmpp::Client;
+use tokio_xmpp::{Client, Packet};
 use xmpp_parsers::{Jid, Element, TryFrom};
 use xmpp_parsers::message::{Body, Message, MessageType};
 use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
@@ -34,7 +34,7 @@ fn main() {
 
             let presence = make_presence();
             let sink = sink_state.take().unwrap();
-            sink_future = Some(Box::new(sink.send(presence)));
+            sink_future = Some(Box::new(sink.send(Packet::Stanza(presence))));
         } else if let Some(message) = event
             .into_stanza()
             .and_then(|stanza| Message::try_from(stanza).ok())
@@ -42,15 +42,15 @@ fn main() {
             match (message.from, message.bodies.get("")) {
                 (Some(ref from), Some(ref body)) if body.0 == "die" => {
                     println!("Secret die command triggered by {}", from);
-                    let sink = sink_state.as_mut().unwrap();
-                    sink.close().expect("close");
+                    let sink = sink_state.take().unwrap();
+                    sink_future = Some(Box::new(sink.send(Packet::StreamEnd)));
                 }
                 (Some(ref from), Some(ref body)) => {
                     if message.type_ != MessageType::Error {
                         // This is a message we'll echo
                         let reply = make_reply(from.clone(), &body.0);
                         let sink = sink_state.take().unwrap();
-                        sink_future = Some(Box::new(sink.send(reply)));
+                        sink_future = Some(Box::new(sink.send(Packet::Stanza(reply))));
                     }
                 }
                 _ => {}

src/client/mod.rs 🔗

@@ -1,6 +1,6 @@
 use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
 use idna;
-use xmpp_parsers::{Jid, JidParseError, Element};
+use xmpp_parsers::{Jid, JidParseError};
 use sasl::common::{ChannelBinding, Credentials};
 use std::mem::replace;
 use std::str::FromStr;
@@ -193,24 +193,13 @@ impl Stream for Client {
 }
 
 impl Sink for Client {
-    type SinkItem = Element;
+    type SinkItem = Packet;
     type SinkError = Error;
 
     fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
         match self.state {
-            ClientState::Connected(ref mut stream) => {
-                match stream.start_send(Packet::Stanza(item)) {
-                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
-                        Ok(AsyncSink::NotReady(stanza)),
-                    Ok(AsyncSink::NotReady(_)) => {
-                        panic!("Client.start_send with stanza but got something else back")
-                    }
-                    Ok(AsyncSink::Ready) =>
-                        Ok(AsyncSink::Ready),
-                    Err(e) =>
-                        Err(e)?,
-                }
-            }
+            ClientState::Connected(ref mut stream) =>
+                Ok(stream.start_send(item)?),
             _ =>
                 Ok(AsyncSink::NotReady(item)),
         }

src/lib.rs 🔗

@@ -8,6 +8,7 @@ extern crate derive_error;
 mod starttls;
 mod stream_start;
 pub mod xmpp_codec;
+pub use crate::xmpp_codec::Packet;
 pub mod xmpp_stream;
 pub use crate::starttls::StartTlsClient;
 mod event;