From c85c98b0bf69715d336de1e6db31498a9e473525 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Sch=C3=A4fer?= Date: Wed, 14 Aug 2024 18:05:05 +0200 Subject: [PATCH] xmlstream: improve responder-side of stream resets This makes the stream resets a lot safer, by preventing the forbidden send-read-reset combination of events: the reset function on the responder side now takes the element to send right before the reset, enforcing a send-reset pattern. --- tokio-xmpp/src/xmlstream/mod.rs | 41 ++++++++++++++++++++++++++++--- tokio-xmpp/src/xmlstream/tests.rs | 11 ++++----- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/tokio-xmpp/src/xmlstream/mod.rs b/tokio-xmpp/src/xmlstream/mod.rs index 799c6306681ebe165b5622405a7d5b791776b668..796020fc9c154b1f2b4a84a2b85e4df09839b7fb 100644 --- a/tokio-xmpp/src/xmlstream/mod.rs +++ b/tokio-xmpp/src/xmlstream/mod.rs @@ -36,12 +36,31 @@ //! object. //! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features //! to the peer and obtain the [`XmlStream`] object. +//! +//! ## Mid-stream resets +//! +//! RFC 6120 describes a couple of situations where stream resets are executed +//! during stream negotiation. During a stream reset, both parties drop their +//! parser state and the stream is started from the beginning, with a new +//! stream header sent by the initiator and received by the responder. +//! +//! Stream resets are inherently prone to race conditions. If the responder +//! executes a read from the underlying transport between sending the element +//! which triggers the stream reset and discarding its parser state, it may +//! accidentally read the initiator's stream header into the *old* parser +//! state instead of the post-reset parser state. +//! +//! Stream resets are executed with the [`XmlStream::initiate_reset`] and +//! [`XmlStream::accept_reset`] functions, for initiator and responder, +//! respectively. In order to avoid the race condition, +//! [`XmlStream::accept_reset`] handles sending the last pre-reset element and +//! resetting the stream in a single step. use core::pin::Pin; use core::task::{Context, Poll}; use std::io; -use futures::{ready, Sink, Stream}; +use futures::{ready, Sink, SinkExt, Stream}; use tokio::io::{AsyncBufRead, AsyncWrite}; @@ -215,10 +234,23 @@ impl XmlStream InitiatingStream(stream) } - /// Anticipate a new stream header sent by the remote party. + /// Trigger a stream reset on the initiator side and await the new stream + /// header. /// /// This is the responder-side counterpart to - /// [`initiate_reset`][`Self::initiate_reset`]. + /// [`initiate_reset`][`Self::initiate_reset`]. The element which causes + /// the stream reset must be passed as `barrier` and it will be sent + /// right before resetting the parser state. This way, the race condition + /// outlined in the [`xmlstream`][`self`] module's documentation is + /// guaranteed to be avoided. + /// + /// Note that you should not send the element passed as `barrier` down the + /// stream yourself, as this function takes care of it. + /// + /// # Stream resets without a triggering element + /// + /// These are not possible to do safely and not specified in RFC 6120, + /// hence they cannot be done in [`XmlStream`]. /// /// # Panics /// @@ -228,8 +260,9 @@ impl XmlStream /// /// In addition, attempting to reset a stream which has been closed by /// either side or which has had an I/O error will also cause a panic. - pub async fn accept_reset(self) -> io::Result> { + pub async fn accept_reset(mut self, barrier: &T) -> io::Result> { self.assert_retypable(); + self.send(barrier).await?; let mut stream = self.inner; Pin::new(&mut stream).reset_state(); diff --git a/tokio-xmpp/src/xmlstream/tests.rs b/tokio-xmpp/src/xmlstream/tests.rs index 2ac5cb6b3b9ca573fcf81b2c52ec6ec919c1d148..2270547ccb0e18b66bb7117dd25aa894f5011139 100644 --- a/tokio-xmpp/src/xmlstream/tests.rs +++ b/tokio-xmpp/src/xmlstream/tests.rs @@ -220,16 +220,15 @@ async fn test_exchange_data_stream_reset_and_shutdown() { let mut stream = stream .send_features::(&StreamFeatures::default()) .await?; - stream - .send(&Data { - contents: "world!".to_owned(), - }) - .await?; match stream.next().await { Some(Ok(Data { contents })) => assert_eq!(contents, "hello"), other => panic!("unexpected stream message: {:?}", other), } - let stream = stream.accept_reset().await?; + let stream = stream + .accept_reset(&Data { + contents: "world!".to_owned(), + }) + .await?; assert_eq!(stream.header().from.unwrap(), "client"); assert_eq!(stream.header().to.unwrap(), "server"); assert_eq!(stream.header().id.unwrap(), "client-id");