From 960fd782bd0295c4796118a366b7e32dc1ed0e61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Sch=C3=A4fer?= Date: Sun, 18 Aug 2024 10:49:29 +0200 Subject: [PATCH] xmlstream: fix clean shutdown sequence Without the early return in XmlStream::poll_next in case of the stream footer, the read state gets recreated and the logic at the top of that function to actually handle stream shutdown gracefully is never triggered. Also that logic was incorrect; the correct behaviour is to wait for the true EOF. --- tokio-xmpp/src/xmlstream/common.rs | 3 ++- tokio-xmpp/src/xmlstream/mod.rs | 28 +++++++++++++++++++++++----- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/tokio-xmpp/src/xmlstream/common.rs b/tokio-xmpp/src/xmlstream/common.rs index c0969564c0d47a896a4c12456ec701c61b84beee..2ea0526b930f738b7c052c9bf3420ab530f5e95c 100644 --- a/tokio-xmpp/src/xmlstream/common.rs +++ b/tokio-xmpp/src/xmlstream/common.rs @@ -460,13 +460,14 @@ impl ReadXsoState { *self = ReadXsoState::Done; return Poll::Ready(Err(io::Error::new( io::ErrorKind::InvalidData, - "end of parent element before XSO started", + "eof before XSO started", ) .into())); } } } ReadXsoState::Parsing(builder) => { + log::trace!("ReadXsoState::Parsing ev = {:?}", ev); let Some(ev) = ev else { *self = ReadXsoState::Done; return Poll::Ready(Err(io::Error::new( diff --git a/tokio-xmpp/src/xmlstream/mod.rs b/tokio-xmpp/src/xmlstream/mod.rs index 04af83c493b0d8a28e522011ac75870d853efa13..a4d88a6aec7aba46e150a9284fde8beb6c99d040 100644 --- a/tokio-xmpp/src/xmlstream/mod.rs +++ b/tokio-xmpp/src/xmlstream/mod.rs @@ -321,7 +321,14 @@ impl Stream for XmlStream, cx: &mut Context<'_>) -> Poll> { let this = self.project(); let result = match this.read_state.as_mut() { - None => return Poll::Ready(Some(Err(ReadError::StreamFooterReceived))), + None => { + // awaiting eof. + return match ready!(this.inner.poll_next(cx)) { + None => Poll::Ready(None), + Some(Ok(_)) => unreachable!("xml parser allowed data after stream footer"), + Some(Err(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))), + }; + } Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)), }; let result = match result { @@ -330,7 +337,9 @@ impl Stream for XmlStream Poll::Ready(Some(Err(ReadError::ParseError(e)))), Err(ReadXsoError::Footer) => { *this.read_state = None; - Poll::Ready(Some(Err(ReadError::StreamFooterReceived))) + // Return early here, because we cannot allow recreation of + // another read state. + return Poll::Ready(Some(Err(ReadError::StreamFooterReceived))); } }; *this.read_state = Some(ReadXsoState::default()); @@ -367,17 +376,26 @@ impl<'x, Io: AsyncWrite, T: FromXml + AsXml + fmt::Debug> Sink<&'x T> for XmlStr match ready!(this.inner.as_mut().poll_ready(cx)) .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot)) { - Ok(()) => (), + Ok(()) => { + log::trace!("stream footer sent successfully"); + } // If it fails, we fail the sink immediately. Err(e) => { + log::debug!( + "omitting stream footer: failed to make stream ready: {}", + e + ); *this.write_state = WriteState::Failed; return Poll::Ready(Err(e)); } } *this.write_state = WriteState::FooterSent; } - // Footer sent => just poll the inner sink for closure. - WriteState::FooterSent => break, + // Footer sent => just poll the inner sink for flush. + WriteState::FooterSent => { + ready!(this.inner.as_mut().poll_flush(cx)?); + break; + } WriteState::Failed => unreachable!(), // caught by check_ok() } }