XMPPCodec Input shall be just one Packet

Astro created

Change summary

src/lib.rs        |  2 +-
src/xmpp_codec.rs | 34 +++++++++++++++++++---------------
2 files changed, 20 insertions(+), 16 deletions(-)

Detailed changes

src/lib.rs 🔗

@@ -76,7 +76,7 @@ impl Future for TcpClient {
             TcpClientState::RecvStart(ref mut opt_xmpp_stream) => {
                 let mut xmpp_stream = opt_xmpp_stream.take().unwrap();
                 match xmpp_stream.poll() {
-                    Ok(Async::Ready(Some(events))) => println!("Recv start: {:?}", events),
+                    Ok(Async::Ready(Some(Packet::StreamStart))) => println!("Recv start!"),
                     Ok(Async::Ready(_)) => return Err(std::io::Error::from(ErrorKind::InvalidData)),
                     Ok(Async::NotReady) => {
                         *opt_xmpp_stream = Some(xmpp_stream);

src/xmpp_codec.rs 🔗

@@ -68,12 +68,14 @@ impl XMPPCodec {
 }
 
 impl Codec for XMPPCodec {
-    type In = Vec<Packet>;
+    type In = Packet;
     type Out = Packet;
 
     fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, Error> {
         println!("XMPPCodec.decode {:?}", buf.len());
-        match from_utf8(buf.as_slice()) {
+        let buf_len = buf.len();
+        let chunk = buf.drain_to(buf_len);
+        match from_utf8(chunk.as_slice()) {
             Ok(s) =>
                 self.parser.feed_str(s),
             Err(e) =>
@@ -81,7 +83,7 @@ impl Codec for XMPPCodec {
         }
 
         let mut new_root = None;
-        let mut results = Vec::new();
+        let mut result = None;
         for event in &mut self.parser {
             match &mut self.root {
                 &mut None => {
@@ -89,10 +91,13 @@ impl Codec for XMPPCodec {
                     match event {
                         Ok(xml::Event::ElementStart(start_tag)) => {
                             new_root = Some(XMPPRoot::new(start_tag));
-                            results.push(Packet::StreamStart);
+                            result = Some(Packet::StreamStart);
+                            break
+                        },
+                        Err(e) => {
+                            result = Some(Packet::Error(Box::new(e)));
+                            break
                         },
-                        Err(e) =>
-                            results.push(Packet::Error(Box::new(e))),
                         _ =>
                             (),
                     }
@@ -103,10 +108,13 @@ impl Codec for XMPPCodec {
                         None => (),
                         Some(Ok(stanza)) => {
                             println!("stanza: {}", stanza);
-                            results.push(Packet::Stanza(stanza));
+                            result = Some(Packet::Stanza(stanza));
+                            break
                         },
-                        Some(Err(e)) =>
-                            results.push(Packet::Error(Box::new(e))),
+                        Some(Err(e)) => {
+                            result = Some(Packet::Error(Box::new(e)));
+                            break
+                        }
                     };
                 },
             }
@@ -117,11 +125,7 @@ impl Codec for XMPPCodec {
             }
         }
 
-        if results.len() == 0 {
-            Ok(None)
-        } else {
-            Ok(Some(results))
-        }
+        Ok(result)
     }
 
     fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> Result<(), Error> {
@@ -147,6 +151,6 @@ impl Codec for XMPPCodec {
     }
 
     fn decode_eof(&mut self, _buf: &mut EasyBuf) -> Result<Self::In, Error> {
-        Ok(vec!())
+        Err(Error::from(ErrorKind::UnexpectedEof))
     }
 }