xmpp_codec, client: handle StreamEnd

Astro created

Change summary

src/client/mod.rs | 20 +++++++++++++++++---
src/error.rs      |  2 ++
src/xmpp_codec.rs | 19 +++++++++++++++++++
3 files changed, 38 insertions(+), 3 deletions(-)

Detailed changes

src/client/mod.rs 🔗

@@ -164,10 +164,24 @@ impl Stream for Client {
                         Ok(Async::Ready(Some(Event::Disconnected)))
                     }
                     Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
+                        // Receive stanza
                         self.state = ClientState::Connected(stream);
                         Ok(Async::Ready(Some(Event::Stanza(stanza))))
                     }
-                    Ok(Async::NotReady) | Ok(Async::Ready(_)) => {
+                    Ok(Async::Ready(Some(Packet::Text(_)))) => {
+                        // Ignore text between stanzas
+                        Ok(Async::NotReady)
+                    }
+                    Ok(Async::Ready(Some(Packet::StreamStart(_)))) => {
+                        // <stream:stream>
+                        Err(ProtocolError::InvalidStreamStart.into())
+                    }
+                    Ok(Async::Ready(Some(Packet::StreamEnd))) => {
+                        // End of stream: </stream:stream>
+                        Ok(Async::Ready(None))
+                    }
+                    Ok(Async::NotReady) => {
+                        // Try again later
                         self.state = ClientState::Connected(stream);
                         Ok(Async::NotReady)
                     }
@@ -212,8 +226,8 @@ impl Sink for Client {
     /// This closes the inner TCP stream.
     ///
     /// To synchronize your shutdown with the server side, you should
-    /// first send `Packet::StreamEnd` and wait it to be sent back
-    /// before closing the connection.
+    /// first send `Packet::StreamEnd` and wait for the end of the
+    /// incoming stream before closing the connection.
     fn close(&mut self) -> Poll<(), Self::SinkError> {
         match self.state {
             ClientState::Connected(ref mut stream) =>

src/error.rs 🔗

@@ -88,6 +88,8 @@ pub enum ProtocolError {
     NoStreamId,
     /// Encountered an unexpected XML token
     InvalidToken,
+    /// Unexpected <stream:stream> (shouldn't occur)
+    InvalidStreamStart,
 }
 
 /// Authentication error

src/xmpp_codec.rs 🔗

@@ -378,6 +378,25 @@ mod tests {
         });
     }
 
+    #[test]
+    fn test_stream_end() {
+        let mut c = XMPPCodec::new();
+        let mut b = BytesMut::with_capacity(1024);
+        b.put(r"<?xml version='1.0'?><stream:stream xmlns:stream='http://etherx.jabber.org/streams' version='1.0' xmlns='jabber:client'>");
+        let r = c.decode(&mut b);
+        assert!(match r {
+            Ok(Some(Packet::StreamStart(_))) => true,
+            _ => false,
+        });
+        b.clear();
+        b.put(r"</stream:stream>");
+        let r = c.decode(&mut b);
+        assert!(match r {
+            Ok(Some(Packet::StreamEnd)) => true,
+            _ => false,
+        });
+    }
+
     #[test]
     fn test_truncated_stanza() {
         let mut c = XMPPCodec::new();