diff --git a/tokio-xmpp/src/client/mod.rs b/tokio-xmpp/src/client/mod.rs index 0197ccd30fa5b06c141aa7318d045a34cfcacf1e..851350a7f5656adfef3ff85b07b714dc38b9921e 100644 --- a/tokio-xmpp/src/client/mod.rs +++ b/tokio-xmpp/src/client/mod.rs @@ -5,7 +5,7 @@ use crate::{ client::{login::client_login, stream::ClientState}, connect::ServerConnector, error::Error, - xmlstream::Timeouts, + xmlstream::{Timeouts, XmppStream, XmppStreamElement}, Stanza, }; @@ -75,7 +75,9 @@ impl Client { /// Make sure to disable reconnect. pub async fn send_end(&mut self) -> Result<(), Error> { match self.state { - ClientState::Connected { ref mut stream, .. } => Ok(stream.close().await?), + ClientState::Connected { ref mut stream, .. } => { + Ok( as SinkExt<&XmppStreamElement>>::close(stream).await?) + } ClientState::Connecting { .. } => { self.state = ClientState::Disconnected; Ok(()) diff --git a/tokio-xmpp/src/client/stream.rs b/tokio-xmpp/src/client/stream.rs index 8120a8702cb97e80b5dbd99edb295d3de56f9245..d324c5b01d183ec410908cdd13ccb2cae7685e3f 100644 --- a/tokio-xmpp/src/client/stream.rs +++ b/tokio-xmpp/src/client/stream.rs @@ -97,7 +97,10 @@ impl Stream for Client { bound_jid, } => { // Poll sink - match Pin::new(&mut stream).poll_ready(cx) { + match as Sink<&XmppStreamElement>>::poll_ready( + Pin::new(&mut stream), + cx, + ) { Poll::Pending => (), Poll::Ready(Ok(())) => (), Poll::Ready(Err(e)) => { @@ -195,36 +198,45 @@ impl Sink for Client { fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> { match self.state { - ClientState::Connected { ref mut stream, .. } => Pin::new(stream) - .start_send(&item.into()) - .map_err(|e| e.into()), + ClientState::Connected { ref mut stream, .. } => { + Pin::new(stream).start_send(&item).map_err(|e| e.into()) + } _ => Err(Error::InvalidState), } } fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match self.state { - ClientState::Connected { ref mut stream, .. } => { - Pin::new(stream).poll_ready(cx).map_err(|e| e.into()) - } + ClientState::Connected { ref mut stream, .. } => as Sink< + &XmppStreamElement, + >>::poll_ready( + Pin::new(stream), cx + ) + .map_err(|e| e.into()), _ => Poll::Pending, } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match self.state { - ClientState::Connected { ref mut stream, .. } => { - Pin::new(stream).poll_flush(cx).map_err(|e| e.into()) - } + ClientState::Connected { ref mut stream, .. } => as Sink< + &XmppStreamElement, + >>::poll_flush( + Pin::new(stream), cx + ) + .map_err(|e| e.into()), _ => Poll::Pending, } } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match self.state { - ClientState::Connected { ref mut stream, .. } => { - Pin::new(stream).poll_close(cx).map_err(|e| e.into()) - } + ClientState::Connected { ref mut stream, .. } => as Sink< + &XmppStreamElement, + >>::poll_close( + Pin::new(stream), cx + ) + .map_err(|e| e.into()), _ => Poll::Pending, } } diff --git a/tokio-xmpp/src/component/stream.rs b/tokio-xmpp/src/component/stream.rs index b1d0d4667aa6c4cea41b810785764e412ccdd72e..01d2d1124ea9a6aa5bc1e0d3cf69ba63894ce910 100644 --- a/tokio-xmpp/src/component/stream.rs +++ b/tokio-xmpp/src/component/stream.rs @@ -6,7 +6,10 @@ use std::pin::Pin; use std::task::Context; use crate::{ - component::Component, connect::ServerConnector, xmlstream::XmppStreamElement, Error, Stanza, + component::Component, + connect::ServerConnector, + xmlstream::{XmppStream, XmppStreamElement}, + Error, Stanza, }; impl Stream for Component { @@ -42,25 +45,31 @@ impl Sink for Component { fn start_send(mut self: Pin<&mut Self>, item: Stanza) -> Result<(), Self::Error> { Pin::new(&mut self.stream) - .start_send(&item.into()) + .start_send(&item) .map_err(|e| e.into()) } fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.stream) - .poll_ready(cx) - .map_err(|e| e.into()) + as Sink<&XmppStreamElement>>::poll_ready( + Pin::new(&mut self.stream), + cx, + ) + .map_err(|e| e.into()) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.stream) - .poll_flush(cx) - .map_err(|e| e.into()) + as Sink<&XmppStreamElement>>::poll_flush( + Pin::new(&mut self.stream), + cx, + ) + .map_err(|e| e.into()) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.stream) - .poll_close(cx) - .map_err(|e| e.into()) + as Sink<&XmppStreamElement>>::poll_close( + Pin::new(&mut self.stream), + cx, + ) + .map_err(|e| e.into()) } } diff --git a/tokio-xmpp/src/event.rs b/tokio-xmpp/src/event.rs index 58d5b26de827c81af4f5cfc499306a93e7b28d64..3d75adf29a3fb9dbadf4a4794712585a6863431e 100644 --- a/tokio-xmpp/src/event.rs +++ b/tokio-xmpp/src/event.rs @@ -6,6 +6,7 @@ use rand::{thread_rng, Rng}; use xmpp_parsers::{iq::Iq, jid::Jid, message::Message, presence::Presence}; +use xso::{AsXml, FromXml}; use crate::xmlstream::XmppStreamElement; use crate::Error; @@ -16,15 +17,19 @@ fn make_id() -> String { } /// A stanza sent/received over the stream. -#[derive(Debug)] +#[derive(FromXml, AsXml, Debug)] +#[xml()] pub enum Stanza { /// IQ stanza + #[xml(transparent)] Iq(Iq), /// Message stanza + #[xml(transparent)] Message(Message), /// Presence stanza + #[xml(transparent)] Presence(Presence), } diff --git a/tokio-xmpp/src/xmlstream/mod.rs b/tokio-xmpp/src/xmlstream/mod.rs index 1ad1ef623abc74537f6ae61b2187bb7fded06e8e..002fbdc0cd4d39531555bda94b26457134fc1043 100644 --- a/tokio-xmpp/src/xmlstream/mod.rs +++ b/tokio-xmpp/src/xmlstream/mod.rs @@ -454,7 +454,7 @@ impl XmlStream { } } -impl<'x, Io: AsyncWrite, T: FromXml + AsXml> Sink<&'x T> for XmlStream { +impl<'x, Io: AsyncWrite, T: FromXml + AsXml, U: AsXml> Sink<&'x U> for XmlStream { type Error = io::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -475,7 +475,7 @@ impl<'x, Io: AsyncWrite, T: FromXml + AsXml> Sink<&'x T> for XmlStream { this.inner.poll_close(cx) } - fn start_send(self: Pin<&mut Self>, item: &'x T) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: &'x U) -> Result<(), Self::Error> { let this = self.project(); this.write_state.check_writable()?; this.inner.start_send_xso(item) diff --git a/tokio-xmpp/src/xmlstream/tests.rs b/tokio-xmpp/src/xmlstream/tests.rs index 3c8d8745608b32bac19f20fd0fb71ccc1a06bc33..b45ddd8620519d25f51544983a244e5c55f8e31f 100644 --- a/tokio-xmpp/src/xmlstream/tests.rs +++ b/tokio-xmpp/src/xmlstream/tests.rs @@ -158,7 +158,7 @@ async fn test_clean_shutdown() { ) .await?; let (_, mut stream) = stream.recv_features::().await?; - stream.close().await?; + SinkExt::<&Data>::close(&mut stream).await?; match stream.next().await { Some(Err(ReadError::StreamFooterReceived)) => (), other => panic!("unexpected stream message: {:?}", other), @@ -181,7 +181,7 @@ async fn test_clean_shutdown() { Some(Err(ReadError::StreamFooterReceived)) => (), other => panic!("unexpected stream message: {:?}", other), } - stream.close().await?; + SinkExt::<&Data>::close(&mut stream).await?; Ok::<_, io::Error>(()) }); @@ -229,7 +229,7 @@ async fn test_exchange_data_stream_reset_and_shutdown() { contents: "once more".to_owned(), }) .await?; - stream.close().await?; + SinkExt::<&Data>::close(&mut stream).await?; match stream.next().await { Some(Ok(Data { contents })) => assert_eq!(contents, "hello world!"), other => panic!("unexpected stream message: {:?}", other), @@ -283,7 +283,7 @@ async fn test_exchange_data_stream_reset_and_shutdown() { Some(Ok(Data { contents })) => assert_eq!(contents, "once more"), other => panic!("unexpected stream message: {:?}", other), } - stream.close().await?; + SinkExt::<&Data>::close(&mut stream).await?; match stream.next().await { Some(Err(ReadError::StreamFooterReceived)) => (), other => panic!("unexpected stream message: {:?}", other), @@ -427,7 +427,7 @@ async fn test_can_receive_after_shutdown() { contents: "world!".to_owned(), }) .await?; - stream.close().await?; + as SinkExt<&Data>>::close(&mut stream).await?; Ok::<_, io::Error>(()) });