xmlstream: make sink implementation generic

Jonas SchΓ€fer created

This allows to use any serialisable type. The advantage is that moves
and clones are avoided (which would otherwise be needed to construct
e.g. a XmppStreamElement from a Stanza or Message).

Change summary

tokio-xmpp/src/client/mod.rs       |  6 +++-
tokio-xmpp/src/client/stream.rs    | 38 +++++++++++++++++++++----------
tokio-xmpp/src/component/stream.rs | 31 ++++++++++++++++---------
tokio-xmpp/src/event.rs            |  7 +++++
tokio-xmpp/src/xmlstream/mod.rs    |  4 +-
tokio-xmpp/src/xmlstream/tests.rs  | 10 ++++----
6 files changed, 62 insertions(+), 34 deletions(-)

Detailed changes

tokio-xmpp/src/client/mod.rs πŸ”—

@@ -5,7 +5,7 @@ use crate::{
     client::{login::client_login, stream::ClientState},
     connect::ServerConnector,
     error::Error,
-    xmlstream::Timeouts,
+    xmlstream::{Timeouts, XmppStream, XmppStreamElement},
     Stanza,
 };
 
@@ -75,7 +75,9 @@ impl<C: ServerConnector> Client<C> {
     /// Make sure to disable reconnect.
     pub async fn send_end(&mut self) -> Result<(), Error> {
         match self.state {
-            ClientState::Connected { ref mut stream, .. } => Ok(stream.close().await?),
+            ClientState::Connected { ref mut stream, .. } => {
+                Ok(<XmppStream<C::Stream> as SinkExt<&XmppStreamElement>>::close(stream).await?)
+            }
             ClientState::Connecting { .. } => {
                 self.state = ClientState::Disconnected;
                 Ok(())

tokio-xmpp/src/client/stream.rs πŸ”—

@@ -97,7 +97,10 @@ impl<C: ServerConnector> Stream for Client<C> {
                 bound_jid,
             } => {
                 // Poll sink
-                match Pin::new(&mut stream).poll_ready(cx) {
+                match <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_ready(
+                    Pin::new(&mut stream),
+                    cx,
+                ) {
                     Poll::Pending => (),
                     Poll::Ready(Ok(())) => (),
                     Poll::Ready(Err(e)) => {
@@ -195,36 +198,45 @@ impl<C: ServerConnector> Sink<Stanza> for Client<C> {
 
     fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> {
         match self.state {
-            ClientState::Connected { ref mut stream, .. } => Pin::new(stream)
-                .start_send(&item.into())
-                .map_err(|e| e.into()),
+            ClientState::Connected { ref mut stream, .. } => {
+                Pin::new(stream).start_send(&item).map_err(|e| e.into())
+            }
             _ => Err(Error::InvalidState),
         }
     }
 
     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
         match self.state {
-            ClientState::Connected { ref mut stream, .. } => {
-                Pin::new(stream).poll_ready(cx).map_err(|e| e.into())
-            }
+            ClientState::Connected { ref mut stream, .. } => <XmppStream<C::Stream> as Sink<
+                &XmppStreamElement,
+            >>::poll_ready(
+                Pin::new(stream), cx
+            )
+            .map_err(|e| e.into()),
             _ => Poll::Pending,
         }
     }
 
     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
         match self.state {
-            ClientState::Connected { ref mut stream, .. } => {
-                Pin::new(stream).poll_flush(cx).map_err(|e| e.into())
-            }
+            ClientState::Connected { ref mut stream, .. } => <XmppStream<C::Stream> as Sink<
+                &XmppStreamElement,
+            >>::poll_flush(
+                Pin::new(stream), cx
+            )
+            .map_err(|e| e.into()),
             _ => Poll::Pending,
         }
     }
 
     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
         match self.state {
-            ClientState::Connected { ref mut stream, .. } => {
-                Pin::new(stream).poll_close(cx).map_err(|e| e.into())
-            }
+            ClientState::Connected { ref mut stream, .. } => <XmppStream<C::Stream> as Sink<
+                &XmppStreamElement,
+            >>::poll_close(
+                Pin::new(stream), cx
+            )
+            .map_err(|e| e.into()),
             _ => Poll::Pending,
         }
     }

tokio-xmpp/src/component/stream.rs πŸ”—

@@ -6,7 +6,10 @@ use std::pin::Pin;
 use std::task::Context;
 
 use crate::{
-    component::Component, connect::ServerConnector, xmlstream::XmppStreamElement, Error, Stanza,
+    component::Component,
+    connect::ServerConnector,
+    xmlstream::{XmppStream, XmppStreamElement},
+    Error, Stanza,
 };
 
 impl<C: ServerConnector> Stream for Component<C> {
@@ -42,25 +45,31 @@ impl<C: ServerConnector> Sink<Stanza> for Component<C> {
 
     fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> {
         Pin::new(&mut self.stream)
-            .start_send(&item.into())
+            .start_send(&item)
             .map_err(|e| e.into())
     }
 
     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Pin::new(&mut self.stream)
-            .poll_ready(cx)
-            .map_err(|e| e.into())
+        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_ready(
+            Pin::new(&mut self.stream),
+            cx,
+        )
+        .map_err(|e| e.into())
     }
 
     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Pin::new(&mut self.stream)
-            .poll_flush(cx)
-            .map_err(|e| e.into())
+        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_flush(
+            Pin::new(&mut self.stream),
+            cx,
+        )
+        .map_err(|e| e.into())
     }
 
     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
-        Pin::new(&mut self.stream)
-            .poll_close(cx)
-            .map_err(|e| e.into())
+        <XmppStream<C::Stream> as Sink<&XmppStreamElement>>::poll_close(
+            Pin::new(&mut self.stream),
+            cx,
+        )
+        .map_err(|e| e.into())
     }
 }

tokio-xmpp/src/event.rs πŸ”—

@@ -6,6 +6,7 @@
 
 use rand::{thread_rng, Rng};
 use xmpp_parsers::{iq::Iq, jid::Jid, message::Message, presence::Presence};
+use xso::{AsXml, FromXml};
 
 use crate::xmlstream::XmppStreamElement;
 use crate::Error;
@@ -16,15 +17,19 @@ fn make_id() -> String {
 }
 
 /// A stanza sent/received over the stream.
-#[derive(Debug)]
+#[derive(FromXml, AsXml, Debug)]
+#[xml()]
 pub enum Stanza {
     /// IQ stanza
+    #[xml(transparent)]
     Iq(Iq),
 
     /// Message stanza
+    #[xml(transparent)]
     Message(Message),
 
     /// Presence stanza
+    #[xml(transparent)]
     Presence(Presence),
 }
 

tokio-xmpp/src/xmlstream/mod.rs πŸ”—

@@ -454,7 +454,7 @@ impl<Io: AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T> {
     }
 }
 
-impl<'x, Io: AsyncWrite, T: FromXml + AsXml> Sink<&'x T> for XmlStream<Io, T> {
+impl<'x, Io: AsyncWrite, T: FromXml + AsXml, U: AsXml> Sink<&'x U> for XmlStream<Io, T> {
     type Error = io::Error;
 
     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@@ -475,7 +475,7 @@ impl<'x, Io: AsyncWrite, T: FromXml + AsXml> Sink<&'x T> for XmlStream<Io, T> {
         this.inner.poll_close(cx)
     }
 
-    fn start_send(self: Pin<&mut Self>, item: &'x T) -> Result<(), Self::Error> {
+    fn start_send(self: Pin<&mut Self>, item: &'x U) -> Result<(), Self::Error> {
         let this = self.project();
         this.write_state.check_writable()?;
         this.inner.start_send_xso(item)

tokio-xmpp/src/xmlstream/tests.rs πŸ”—

@@ -158,7 +158,7 @@ async fn test_clean_shutdown() {
         )
         .await?;
         let (_, mut stream) = stream.recv_features::<Data>().await?;
-        stream.close().await?;
+        SinkExt::<&Data>::close(&mut stream).await?;
         match stream.next().await {
             Some(Err(ReadError::StreamFooterReceived)) => (),
             other => panic!("unexpected stream message: {:?}", other),
@@ -181,7 +181,7 @@ async fn test_clean_shutdown() {
             Some(Err(ReadError::StreamFooterReceived)) => (),
             other => panic!("unexpected stream message: {:?}", other),
         }
-        stream.close().await?;
+        SinkExt::<&Data>::close(&mut stream).await?;
         Ok::<_, io::Error>(())
     });
 
@@ -229,7 +229,7 @@ async fn test_exchange_data_stream_reset_and_shutdown() {
                 contents: "once more".to_owned(),
             })
             .await?;
-        stream.close().await?;
+        SinkExt::<&Data>::close(&mut stream).await?;
         match stream.next().await {
             Some(Ok(Data { contents })) => assert_eq!(contents, "hello world!"),
             other => panic!("unexpected stream message: {:?}", other),
@@ -283,7 +283,7 @@ async fn test_exchange_data_stream_reset_and_shutdown() {
             Some(Ok(Data { contents })) => assert_eq!(contents, "once more"),
             other => panic!("unexpected stream message: {:?}", other),
         }
-        stream.close().await?;
+        SinkExt::<&Data>::close(&mut stream).await?;
         match stream.next().await {
             Some(Err(ReadError::StreamFooterReceived)) => (),
             other => panic!("unexpected stream message: {:?}", other),
@@ -427,7 +427,7 @@ async fn test_can_receive_after_shutdown() {
                 contents: "world!".to_owned(),
             })
             .await?;
-        stream.close().await?;
+        <XmlStream<_, _> as SinkExt<&Data>>::close(&mut stream).await?;
         Ok::<_, io::Error>(())
     });