xmlstream: improve responder-side of stream resets

Jonas Schäfer created

This makes the stream resets a lot safer, by preventing the forbidden
send-read-reset combination of events: the reset function on the
responder side now takes the element to send right before the reset,
enforcing a send-reset pattern.

Change summary

tokio-xmpp/src/xmlstream/mod.rs   | 41 +++++++++++++++++++++++++++++---
tokio-xmpp/src/xmlstream/tests.rs | 11 ++++----
2 files changed, 42 insertions(+), 10 deletions(-)

Detailed changes

tokio-xmpp/src/xmlstream/mod.rs 🔗

@@ -36,12 +36,31 @@
 //!    object.
 //! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features
 //!    to the peer and obtain the [`XmlStream`] object.
+//!
+//! ## Mid-stream resets
+//!
+//! RFC 6120 describes a couple of situations where stream resets are executed
+//! during stream negotiation. During a stream reset, both parties drop their
+//! parser state and the stream is started from the beginning, with a new
+//! stream header sent by the initiator and received by the responder.
+//!
+//! Stream resets are inherently prone to race conditions. If the responder
+//! executes a read from the underlying transport between sending the element
+//! which triggers the stream reset and discarding its parser state, it may
+//! accidentally read the initiator's stream header into the *old* parser
+//! state instead of the post-reset parser state.
+//!
+//! Stream resets are executed with the [`XmlStream::initiate_reset`] and
+//! [`XmlStream::accept_reset`] functions, for initiator and responder,
+//! respectively. In order to avoid the race condition,
+//! [`XmlStream::accept_reset`] handles sending the last pre-reset element and
+//! resetting the stream in a single step.
 
 use core::pin::Pin;
 use core::task::{Context, Poll};
 use std::io;
 
-use futures::{ready, Sink, Stream};
+use futures::{ready, Sink, SinkExt, Stream};
 
 use tokio::io::{AsyncBufRead, AsyncWrite};
 
@@ -215,10 +234,23 @@ impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T>
         InitiatingStream(stream)
     }
 
-    /// Anticipate a new stream header sent by the remote party.
+    /// Trigger a stream reset on the initiator side and await the new stream
+    /// header.
     ///
     /// This is the responder-side counterpart to
-    /// [`initiate_reset`][`Self::initiate_reset`].
+    /// [`initiate_reset`][`Self::initiate_reset`]. The element which causes
+    /// the stream reset must be passed as `barrier` and it will be sent
+    /// right before resetting the parser state. This way, the race condition
+    /// outlined in the [`xmlstream`][`self`] module's documentation is
+    /// guaranteed to be avoided.
+    ///
+    /// Note that you should not send the element passed as `barrier` down the
+    /// stream yourself, as this function takes care of it.
+    ///
+    /// # Stream resets without a triggering element
+    ///
+    /// These are not possible to do safely and not specified in RFC 6120,
+    /// hence they cannot be done in [`XmlStream`].
     ///
     /// # Panics
     ///
@@ -228,8 +260,9 @@ impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T>
     ///
     /// In addition, attempting to reset a stream which has been closed by
     /// either side or which has had an I/O error will also cause a panic.
-    pub async fn accept_reset(self) -> io::Result<AcceptedStream<Io>> {
+    pub async fn accept_reset(mut self, barrier: &T) -> io::Result<AcceptedStream<Io>> {
         self.assert_retypable();
+        self.send(barrier).await?;
 
         let mut stream = self.inner;
         Pin::new(&mut stream).reset_state();

tokio-xmpp/src/xmlstream/tests.rs 🔗

@@ -220,16 +220,15 @@ async fn test_exchange_data_stream_reset_and_shutdown() {
         let mut stream = stream
             .send_features::<Data>(&StreamFeatures::default())
             .await?;
-        stream
-            .send(&Data {
-                contents: "world!".to_owned(),
-            })
-            .await?;
         match stream.next().await {
             Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
             other => panic!("unexpected stream message: {:?}", other),
         }
-        let stream = stream.accept_reset().await?;
+        let stream = stream
+            .accept_reset(&Data {
+                contents: "world!".to_owned(),
+            })
+            .await?;
         assert_eq!(stream.header().from.unwrap(), "client");
         assert_eq!(stream.header().to.unwrap(), "server");
         assert_eq!(stream.header().id.unwrap(), "client-id");