tokio_xmpp: implement high-level, resilient stanza stream

Jonas Schรคfer created

Change summary

parsers/src/sm.rs                                |  33 
parsers/src/stream_features.rs                   |   4 
tokio-xmpp/src/client/login.rs                   |  20 
tokio-xmpp/src/client/mod.rs                     |   2 
tokio-xmpp/src/lib.rs                            |   1 
tokio-xmpp/src/stanzastream/connected.rs         | 836 ++++++++++++++++++
tokio-xmpp/src/stanzastream/error.rs             |  16 
tokio-xmpp/src/stanzastream/mod.rs               | 268 +++++
tokio-xmpp/src/stanzastream/negotiation.rs       | 534 +++++++++++
tokio-xmpp/src/stanzastream/queue.rs             | 356 +++++++
tokio-xmpp/src/stanzastream/stream_management.rs | 256 +++++
tokio-xmpp/src/stanzastream/worker.rs            | 612 +++++++++++++
tokio-xmpp/src/xmlstream/mod.rs                  |   7 
tokio-xmpp/src/xmlstream/xmpp.rs                 |   6 
14 files changed, 2,943 insertions(+), 8 deletions(-)

Detailed changes

parsers/src/sm.rs ๐Ÿ”—

@@ -180,6 +180,39 @@ impl From<HandledCountTooHigh> for crate::stream_error::StreamError {
     }
 }
 
+/// Enum which allows parsing/serialising any XEP-0198 element.
+#[derive(FromXml, AsXml, PartialEq, Debug, Clone)]
+#[xml()]
+pub enum Nonza {
+    /// Request to enable SM
+    #[xml(transparent)]
+    Enable(Enable),
+
+    /// Successful SM enablement response
+    #[xml(transparent)]
+    Enabled(Enabled),
+
+    /// Request to resume SM
+    #[xml(transparent)]
+    Resume(Resume),
+
+    ///ย Sucessful SM resumption response
+    #[xml(transparent)]
+    Resumed(Resumed),
+
+    /// Error response
+    #[xml(transparent)]
+    Failed(Failed),
+
+    /// Acknowledgement
+    #[xml(transparent)]
+    Ack(A),
+
+    /// Request for an acknowledgement
+    #[xml(transparent)]
+    Req(R),
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

parsers/src/stream_features.rs ๐Ÿ”—

@@ -43,6 +43,10 @@ pub struct StreamFeatures {
     #[xml(child(default))]
     pub sasl_cb: Option<SaslChannelBinding>,
 
+    /// Stream management feature
+    #[xml(child(default))]
+    pub stream_management: Option<crate::sm::StreamManagement>,
+
     /// Other stream features advertised
     ///
     /// If some features you use end up here, you may want to contribute

tokio-xmpp/src/client/login.rs ๐Ÿ”—

@@ -103,14 +103,13 @@ pub async fn auth<S: AsyncBufRead + AsyncWrite + Unpin>(
     Err(AuthError::NoMechanism.into())
 }
 
-/// Log into an XMPP server as a client with a jid+pass
-/// does channel binding if supported
-pub async fn client_login<C: ServerConnector>(
+/// Authenticate to an XMPP server, but do not bind a resource.
+pub async fn client_auth<C: ServerConnector>(
     server: C,
     jid: Jid,
     password: String,
     timeouts: Timeouts,
-) -> Result<(Option<FullJid>, StreamFeatures, XmppStream<C::Stream>), Error> {
+) -> Result<(StreamFeatures, XmppStream<C::Stream>), Error> {
     let username = jid.node().unwrap().as_str();
     let password = password;
 
@@ -132,7 +131,18 @@ pub async fn client_login<C: ServerConnector>(
             id: None,
         })
         .await?;
-    let (features, mut stream) = stream.recv_features().await?;
+    Ok(stream.recv_features().await?)
+}
+
+/// Log into an XMPP server as a client with a jid+pass
+/// does channel binding if supported
+pub async fn client_login<C: ServerConnector>(
+    server: C,
+    jid: Jid,
+    password: String,
+    timeouts: Timeouts,
+) -> Result<(Option<FullJid>, StreamFeatures, XmppStream<C::Stream>), Error> {
+    let (features, mut stream) = client_auth(server, jid.clone(), password, timeouts).await?;
 
     // XmppStream bound to user session
     let full_jid = bind(&mut stream, &features, &jid).await?;

tokio-xmpp/src/client/mod.rs ๐Ÿ”—

@@ -17,7 +17,7 @@ use crate::connect::StartTlsServerConnector;
 use crate::connect::TcpServerConnector;
 
 mod bind;
-mod login;
+pub(crate) mod login;
 mod stream;
 
 /// XMPP client connection and state

tokio-xmpp/src/lib.rs ๐Ÿ”—

@@ -49,6 +49,7 @@ compile_error!(
 mod event;
 pub use event::{Event, Stanza};
 pub mod connect;
+pub mod stanzastream;
 pub mod xmlstream;
 
 mod client;

tokio-xmpp/src/stanzastream/connected.rs ๐Ÿ”—

@@ -0,0 +1,836 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use core::future::Future;
+use core::ops::ControlFlow::{Break, Continue};
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use std::io;
+
+use futures::{ready, Sink, Stream};
+
+use xmpp_parsers::{
+    jid::Jid,
+    sm,
+    stream_error::{DefinedCondition, SentStreamError, StreamError},
+    stream_features::StreamFeatures,
+};
+
+use crate::xmlstream::{ReadError, XmppStreamElement};
+use crate::Stanza;
+
+use super::negotiation::{NegotiationResult, NegotiationState};
+use super::queue::{QueueEntry, StanzaState, TransmitQueue};
+use super::stream_management::*;
+use super::worker::{WorkerEvent, XmppStream, LOCAL_SHUTDOWN_TIMEOUT};
+
+#[derive(PartialEq)]
+pub(super) enum RxShutdownState {
+    AwaitingFooter,
+    AwaitingEof,
+    Done,
+}
+
+fn local_error_for_stream_error(
+    io_error: &mut Option<io::Error>,
+    stream_error: &mut Option<StreamError>,
+) -> io::Error {
+    io_error
+        .take()
+        .or_else(|| {
+            stream_error
+                .take()
+                .map(|x| io::Error::new(io::ErrorKind::InvalidData, SentStreamError(x)))
+        })
+        .unwrap_or_else(|| {
+            io::Error::new(
+                io::ErrorKind::InvalidData,
+                "unknown local stream error generated",
+            )
+        })
+}
+
+/// Substate of the [`BackendStream::Connected`] state.
+///
+/// Having the substate and its logic in a separate type allows us to
+/// circumvent problemns with moving data out of `&mut _` when transitioning
+/// between substates.
+pub(super) enum ConnectedState {
+    /// The stream is still being negotiated.
+    Negotiating {
+        /// Current state within the negotiations
+        substate: NegotiationState,
+    },
+
+    /// The stream is ready for transceiving.
+    Ready {
+        /// Stream management state, if any.
+        sm_state: Option<SmState>,
+    },
+
+    SendStreamError {
+        /// Stream error to send.
+        ///
+        /// `None` implies that we now only need to flush.
+        stream_error: Option<StreamError>,
+
+        /// I/O error to return to the caller once the flush is done.
+        ///
+        /// If `None`, an error will be synthesised.
+        io_error: Option<io::Error>,
+
+        /// Deadline until which the error must've been sent and the stream
+        /// must've been shut down.
+        deadline: Pin<Box<tokio::time::Sleep>>,
+    },
+
+    Failed {
+        error: Option<io::Error>,
+        sm_state: Option<SmState>,
+    },
+
+    /// A stream shutdown was initiated locally and we are flushing RX and TX
+    /// queues.
+    LocalShutdown {
+        /// Keep track on whether we have closed the TX side yet.
+        tx_closed: bool,
+
+        /// Keep track on how shut down the receiving side is.
+        rx_state: RxShutdownState,
+
+        /// Deadline until which graceful shutdown must complete; if the
+        /// deadline is exceeded (i.e. the contained Sleep future returns
+        /// ready), the streams will be dropped (and thus closed by the OS).
+        deadline: Pin<Box<tokio::time::Sleep>>,
+    },
+
+    /// The remote side closed the stream.
+    RemoteShutdown {
+        /// Keep the SM state for later resumption.
+        sm_state: Option<SmState>,
+    },
+
+    /// Local shutdown has completed; this is a final state, as local shutdown
+    /// signals an intent of stopping the stream forever.
+    LocalShutdownComplete,
+}
+
+/// Enumeration of events happening while the stream has finished the
+/// connection procedures and is established.
+pub(super) enum ConnectedEvent {
+    /// Event generated by the stream worker.
+    Worker(WorkerEvent),
+
+    /// The remote closed the stream orderly.
+    RemoteShutdown { sm_state: Option<SmState> },
+
+    /// We got disconnected through an error, either an I/O error
+    /// or some kind of stream error.
+    Disconnect {
+        /// Stream management state for later resumption attempts.
+        sm_state: Option<SmState>,
+
+        /// The error which caused the disconnect. This is generally not none,
+        /// but we cannot prove this at compile time because we have to take
+        /// the error from a mutable (i.e. non-owned) place.
+        error: Option<io::Error>,
+    },
+
+    /// A shutdown was requested by the local side of the stream.
+    LocalShutdownRequested,
+}
+
+impl ConnectedState {
+    fn to_stream_error_state(&mut self, stream_error: StreamError) {
+        *self = Self::SendStreamError {
+            stream_error: Some(stream_error),
+            io_error: None,
+            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
+        };
+    }
+
+    fn to_failed_state(&mut self, error: io::Error, sm_state: Option<SmState>) {
+        *self = Self::Failed {
+            error: Some(error),
+            sm_state,
+        };
+    }
+
+    fn poll_write_sm_req(
+        mut sm_state: Option<&mut SmState>,
+        mut stream: Pin<&mut XmppStream>,
+        cx: &mut Context<'_>,
+    ) -> Poll<io::Result<()>> {
+        if let Some(sm_state) = sm_state.as_mut() {
+            // Request is pending.
+            if sm_state.pending_req {
+                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
+                    stream.as_mut(),
+                    cx,
+                )) {
+                    Ok(()) => (),
+                    Err(e) => return Poll::Ready(Err(e)),
+                }
+                match stream
+                    .as_mut()
+                    .start_send(&XmppStreamElement::SM(sm::Nonza::Req(sm::R)))
+                {
+                    Ok(()) => (),
+                    Err(e) => {
+                        // As the stream promised we would be able to
+                        // send, this must be a problem with our
+                        // (locally generated) nonza, i.e. this is
+                        // fatal.
+                        panic!("Failed to send SM Req nonza: {}", e);
+                    }
+                }
+                sm_state.pending_req = false;
+            }
+        }
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_writes_inner(
+        mut sm_state: Option<&mut SmState>,
+        mut stream: Pin<&mut XmppStream>,
+        transmit_queue: &mut TransmitQueue<QueueEntry>,
+        cx: &mut Context<'_>,
+    ) -> Poll<io::Result<()>> {
+        let mut depleted = false;
+
+        // We prefer sending SM reqs before actual data.
+        // SM requests are used in response to soft timeouts as a way to
+        // trigger the remote side to send *something* to us. We may be
+        // sending a lot of data in bulk right now without ever expecting a
+        // response (or at least not anytime soon). In order to ensure that
+        // the server will in fact send a message to us soon, we have to send
+        // the SM request before anything else.
+        //
+        // Example scenario: Sending a bunch of MAM `<message/>`s over a slow,
+        // but low-latency link. The MAM response only triggers a response
+        // from the peer when everything has been transmitted, which may be
+        // longer than the stream timeout.
+        ready!(Self::poll_write_sm_req(
+            match sm_state {
+                None => None,
+                Some(ref mut v) => Some(v),
+            },
+            stream.as_mut(),
+            cx
+        ))?;
+
+        let mut transmitted = false;
+        // We prefer sending actual data before stream-management ACKs.
+        // While the other side may be waiting for our ACK, we are not obliged
+        // to send it straight away (XEP-0198 explicitly allows us to delay it
+        // for some implementation-defined time if we have stuff to send. Our
+        // implementation-defined time is "infinity"), so we try to make
+        // progress on real data.
+        loop {
+            // If either the queue has nothing for us or the stream isn't
+            // ready to send, we break out of the loop. We don't use ready!
+            // here because we may have SM ACKs to send.
+            let next = match transmit_queue.poll_next(cx) {
+                Poll::Ready(Some(v)) => v,
+                Poll::Ready(None) => {
+                    // The transmit_queue is empty, so we set `depleted` to
+                    // true in order to ensure that we return Ready if all SM
+                    // acks also have been transmitted.
+                    depleted = true;
+                    break;
+                }
+                Poll::Pending => break,
+            };
+            // If the stream isn't ready to send, none of the other things can
+            // be sent either, so we can use ready!.
+            match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
+                stream.as_mut(),
+                cx
+            )) {
+                Ok(()) => (),
+                Err(e) => return Poll::Ready(Err(e)),
+            }
+            // We now either send the item or "die trying". It must
+            // be removed from the queue, because even if it fails to
+            // serialise, we don't want to reattempt sending it (
+            // unless by SM resumption retransmission).
+            let next = next.take();
+            match stream.as_mut().start_send(&next.stanza) {
+                Ok(()) => {
+                    if let Some(sm_state) = sm_state.as_mut() {
+                        sm_state.enqueue(next);
+                    }
+                    transmitted = true;
+                }
+                // Serialisation error, report back to the queue item.
+                Err(e) => {
+                    next.token
+                        .send_replace(StanzaState::Failed { error: e.into() });
+                }
+            }
+        }
+
+        if let Some(sm_state) = sm_state.as_mut() {
+            // We can set it to transmitted directly, because it has been
+            // cleared by the previous call to poll_write_sm_req.
+            sm_state.pending_req = transmitted;
+            ready!(Self::poll_write_sm_req(Some(sm_state), stream.as_mut(), cx))?;
+        }
+
+        // Now, if the stream will let us and we need to, we can tack
+        // on some SM ACKs.
+        if let Some(sm_state) = sm_state {
+            while sm_state.pending_acks > 0 {
+                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
+                    stream.as_mut(),
+                    cx,
+                )) {
+                    Ok(()) => (),
+                    Err(e) => return Poll::Ready(Err(e)),
+                }
+                match stream
+                    .as_mut()
+                    .start_send(&XmppStreamElement::SM(sm::Nonza::Ack(sm::A {
+                        h: sm_state.inbound_ctr(),
+                    }))) {
+                    Ok(()) => (),
+                    Err(e) => {
+                        // As the stream promised we would be able to
+                        // send, this must be a problem with our
+                        // (locally generated) nonza, i.e. this is
+                        // fatal.
+                        panic!("Failed to send SM Ack nonza: {}", e);
+                    }
+                }
+                sm_state.pending_acks -= 1;
+            }
+        }
+
+        // If we haven't transmitted, we may also not have polled
+        // the stream for readiness or flushing. We need to do
+        // that here to ensure progress. Even if our tx queue is
+        // empty, the tx buffer may be nonempty
+        match ready!(<XmppStream as Sink<&Stanza>>::poll_flush(
+            stream.as_mut(),
+            cx
+        )) {
+            Ok(()) => (),
+            Err(e) => return Poll::Ready(Err(e)),
+        }
+
+        // If we end up here, all data we currently have has been
+        // transmitted via the stream and the stream's tx buffers have
+        // been properly flushed.
+        if depleted {
+            // And here, we know that the transmit queue is closed,
+            // too. We return with success.
+            Poll::Ready(Ok(()))
+        } else {
+            // The transmit queue is still open, so more data could
+            // pour in to be transmitted.
+            Poll::Pending
+        }
+    }
+
+    /// Drive the stream in transmit simplex mode.
+    ///
+    /// This will block forever (i.e. return [`Poll::Pending`] without
+    /// installing a waker) if the stream is currently being negotiated.
+    /// Otherwise, it will attempt to drain the `transmit_queue`. When the
+    /// queue is empty, `Ok(())` is returned. When write errors occur, this
+    /// will also block forever.
+    ///
+    /// If nothing is to be sent, but the stream could be used for sending,
+    /// this will drive the flush part of the inner stream.
+    ///
+    /// Any errors are reported on the next call to `poll`.
+    pub fn poll_writes(
+        &mut self,
+        stream: Pin<&mut XmppStream>,
+        transmit_queue: &mut TransmitQueue<QueueEntry>,
+        cx: &mut Context<'_>,
+    ) -> Poll<()> {
+        match self {
+            Self::Ready { sm_state, .. } => match ready!(Self::poll_writes_inner(
+                sm_state.as_mut(),
+                stream,
+                transmit_queue,
+                cx
+            )) {
+                Ok(()) => Poll::Ready(()),
+                Err(e) => {
+                    *self = Self::Failed {
+                        error: Some(e),
+                        sm_state: sm_state.take(),
+                    };
+                    Poll::Pending
+                }
+            },
+
+            _ => Poll::Pending,
+        }
+    }
+
+    /// Drive the stream in full-duplex mode.
+    ///
+    /// Stanzas from the `transmit_queue` are transmitted once the stream is
+    /// ready.
+    ///
+    /// Returns:
+    /// - Poll::Pending if it blocks on the inner stream
+    /// - Poll::Ready(None) if it needs to be called again for a proper result
+    /// - Poll::Ready(Some(.)) when it has proper result
+    pub fn poll(
+        &mut self,
+        mut stream: Pin<&mut XmppStream>,
+        jid: &Jid,
+        features: &StreamFeatures,
+        transmit_queue: &mut TransmitQueue<QueueEntry>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<ConnectedEvent>> {
+        match self {
+            Self::Negotiating { ref mut substate } => {
+                match ready!(substate.advance(stream, jid, transmit_queue, cx)) {
+                    Break(NegotiationResult::Disconnect { sm_state, error }) => {
+                        self.to_failed_state(error, sm_state);
+                        Poll::Ready(None)
+                    }
+                    Break(NegotiationResult::StreamReset {
+                        sm_state,
+                        bound_jid,
+                    }) => {
+                        *self = Self::Ready { sm_state };
+                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Reset {
+                            bound_jid,
+                            features: features.clone(),
+                        })))
+                    }
+                    Break(NegotiationResult::StreamResumed { sm_state }) => {
+                        *self = Self::Ready {
+                            sm_state: Some(sm_state),
+                        };
+                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Resumed)))
+                    }
+                    Break(NegotiationResult::StreamError { error }) => {
+                        self.to_stream_error_state(error);
+                        Poll::Ready(None)
+                    }
+                    Continue(Some(stanza)) => {
+                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(stanza))))
+                    }
+                    Continue(None) => Poll::Ready(None),
+                }
+            }
+
+            Self::SendStreamError {
+                ref mut stream_error,
+                ref mut io_error,
+                ref mut deadline,
+            } => {
+                match stream.as_mut().poll_next(cx) {
+                    Poll::Pending
+                    | Poll::Ready(None)
+                    | Poll::Ready(Some(Err(ReadError::StreamFooterReceived)))
+                    | Poll::Ready(Some(Err(ReadError::SoftTimeout))) => (),
+                    Poll::Ready(Some(Ok(ev))) => {
+                        log::trace!("Discarding incoming data while sending stream error: {ev:?}")
+                    }
+                    Poll::Ready(Some(Err(ReadError::ParseError(e)))) => {
+                        log::trace!("Ignoring parse error while sending stream error: {e}")
+                    }
+                    Poll::Ready(Some(Err(ReadError::HardError(e)))) => {
+                        log::warn!("I/O error while sending stream error: {e}")
+                    }
+                }
+
+                match deadline.as_mut().poll(cx) {
+                    Poll::Pending => (),
+                    Poll::Ready(()) => {
+                        log::debug!("Timeout while sending stream error. Discarding state.");
+                        let error = local_error_for_stream_error(io_error, stream_error);
+                        self.to_failed_state(error, None);
+                        return Poll::Ready(None);
+                    }
+                }
+
+                // Cannot use ready! here because we have to consider the
+                // case where the other side is refusing to accept data
+                // because its outgoing buffer is too full.
+                if stream_error.is_some() {
+                    match ready!(<XmppStream as Sink<&StreamError>>::poll_ready(
+                        stream.as_mut(),
+                        cx
+                    ))
+                    .and_then(|()| {
+                        // The take serves as transition to the next state.
+                        let stream_error = stream_error.take().unwrap();
+                        let result = stream.as_mut().start_send(&stream_error);
+                        *io_error = Some(local_error_for_stream_error(
+                            io_error,
+                            &mut Some(stream_error),
+                        ));
+                        result
+                    }) {
+                        Ok(()) => (),
+                        Err(e) => {
+                            log::debug!("Got I/O error while sending stream error: {e}. Skipping error transmission.");
+                            let error = local_error_for_stream_error(io_error, stream_error);
+                            self.to_failed_state(error, None);
+                            return Poll::Ready(None);
+                        }
+                    }
+                }
+
+                match ready!(<XmppStream as Sink<&StreamError>>::poll_flush(
+                    stream.as_mut(),
+                    cx
+                )) {
+                    Ok(()) => (),
+                    Err(e) => {
+                        log::debug!(
+                            "Got I/O error while flushing stream error: {e}. Skipping flush.",
+                        );
+                    }
+                }
+                log::trace!("Stream error send complete, transitioning to Failed state");
+                *self = Self::Failed {
+                    error: Some(local_error_for_stream_error(io_error, stream_error)),
+                    // Do *not* resume after we caused a stream error.
+                    sm_state: None,
+                };
+
+                // Request the caller to call us again to get the
+                // actual error message.
+                Poll::Ready(None)
+            }
+
+            Self::Ready { ref mut sm_state } => {
+                match Self::poll_writes_inner(
+                    sm_state.as_mut(),
+                    stream.as_mut(),
+                    transmit_queue,
+                    cx,
+                ) {
+                    Poll::Pending => (),
+                    Poll::Ready(Ok(())) => {
+                        *self = Self::LocalShutdown {
+                            rx_state: RxShutdownState::AwaitingFooter,
+                            tx_closed: false,
+                            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
+                        };
+                        return Poll::Ready(Some(ConnectedEvent::LocalShutdownRequested));
+                    }
+                    Poll::Ready(Err(e)) => {
+                        *self = Self::Failed {
+                            error: Some(e),
+                            sm_state: sm_state.take(),
+                        };
+                        return Poll::Ready(None);
+                    }
+                }
+
+                let item = ready!(stream.poll_next(cx));
+                // We switch to a TxOpen or non-connected state when we
+                // receive the stream footer, so reading `None` from the
+                // stream is always an unclean closure.
+                let item = item.unwrap_or_else(|| {
+                    Err(ReadError::HardError(io::Error::new(
+                        io::ErrorKind::UnexpectedEof,
+                        "eof before stream footer",
+                    )))
+                });
+                match item {
+                    // Easy case, we got some data.
+                    Ok(XmppStreamElement::Stanza(data)) => {
+                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(data))))
+                    }
+
+                    Ok(XmppStreamElement::SM(sm::Nonza::Ack(ack))) => {
+                        if let Some(sm_state) = sm_state {
+                            match sm_state.remote_acked(ack.h) {
+                                Ok(()) => Poll::Ready(None),
+                                Err(e) => {
+                                    log::error!(
+                                        "Failed to process <sm:a/> sent by the server: {e}",
+                                    );
+                                    self.to_stream_error_state(e.into());
+                                    return Poll::Ready(None);
+                                }
+                            }
+                        } else {
+                            log::debug!("Hmm... I got an <sm:a/> from the peer, but I don't have a stream management state. I'm gonna ignore that...");
+                            Poll::Ready(None)
+                        }
+                    }
+
+                    Ok(XmppStreamElement::SM(sm::Nonza::Req(_))) => {
+                        if let Some(sm_state) = sm_state {
+                            match sm_state.pending_acks.checked_add(1) {
+                                None => panic!("Too many pending ACKs, something is wrong."),
+                                Some(v) => sm_state.pending_acks = v,
+                            }
+                        } else {
+                            log::warn!("Got an <sm:r/> from the peer, but we don't have any stream management state. Terminating stream with an error.");
+                            self.to_stream_error_state(StreamError {
+                                condition: DefinedCondition::UnsupportedStanzaType,
+                                text: Some((
+                                    None,
+                                    "received <sm:r/>, but stream management is not enabled"
+                                        .to_owned(),
+                                )),
+                                application_specific: vec![],
+                            });
+                        }
+                        // No matter whether we "enqueued" an ACK for send or
+                        // whether we just successfully read something from
+                        // the stream, we have to request to be polled again
+                        // right away.
+                        Poll::Ready(None)
+                    }
+
+                    Ok(other) => {
+                        log::warn!(
+                            "Received unsupported stream element: {other:?}. Emitting stream error.",
+                        );
+                        self.to_stream_error_state(StreamError {
+                            condition: DefinedCondition::UnsupportedStanzaType,
+                            // TODO: figure out a good way to provide the
+                            // sender with more information.
+                            text: None,
+                            application_specific: vec![],
+                        });
+                        Poll::Ready(None)
+                    }
+
+                    // Another easy case: Soft timeouts are passed through
+                    // to the caller for handling.
+                    Err(ReadError::SoftTimeout) => {
+                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::SoftTimeout)))
+                    }
+
+                    // Parse errors are also just passed through (and will
+                    // likely cause us to send a stream error).
+                    Err(ReadError::ParseError(e)) => {
+                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::ParseError(e))))
+                    }
+
+                    // I/O errors cause the stream to be considerde
+                    // broken; we drop it and send a Disconnect event with
+                    // the error embedded.
+                    Err(ReadError::HardError(e)) => {
+                        let sm_state = sm_state.take();
+                        Poll::Ready(Some(ConnectedEvent::Disconnect {
+                            sm_state,
+                            error: Some(e),
+                        }))
+                    }
+
+                    // Stream footer indicates the remote wants to shut this
+                    // stream down.
+                    // We transition into RemoteShutdown state which makes us
+                    // emit a special event until the caller takes care of it.
+                    Err(ReadError::StreamFooterReceived) => {
+                        *self = Self::RemoteShutdown {
+                            sm_state: sm_state.take(),
+                        };
+
+                        // Let us be called again immediately to emit the
+                        // notification.
+                        Poll::Ready(None)
+                    }
+                }
+            }
+
+            Self::Failed { sm_state, error } => Poll::Ready(Some(ConnectedEvent::Disconnect {
+                error: error.take(),
+                sm_state: sm_state.take(),
+            })),
+
+            Self::LocalShutdown { .. } | Self::LocalShutdownComplete => {
+                panic!("poll_next called in local shutdown");
+            }
+
+            Self::RemoteShutdown { ref mut sm_state } => {
+                Poll::Ready(Some(ConnectedEvent::RemoteShutdown {
+                    sm_state: sm_state.take(),
+                }))
+            }
+        }
+    }
+
+    pub(super) fn poll_close(
+        &mut self,
+        mut stream: Pin<&mut XmppStream>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        loop {
+            match self {
+                // User initiates shutdown by local choice.
+                // The actual shutdown is driven by the poll_read function and we
+                // only get woken up via the close_poller waker.
+                Self::Ready { .. } | Self::RemoteShutdown { .. } | Self::Negotiating { .. } => {
+                    *self = Self::LocalShutdown {
+                        rx_state: RxShutdownState::AwaitingFooter,
+                        tx_closed: false,
+                        deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
+                    };
+                }
+
+                Self::Failed { error, .. } => match error.take() {
+                    Some(error) => return Poll::Ready(Err(error)),
+                    None => return Poll::Ready(Ok(())),
+                },
+
+                // If close is called while an attempt is made to send the
+                // stream error, we abort transmission.
+                Self::SendStreamError { .. } => {
+                    log::debug!("close() called while stream error was being sent. Aborting transmission of stream error.");
+                    *self = Self::LocalShutdown {
+                        rx_state: RxShutdownState::AwaitingFooter,
+                        tx_closed: false,
+                        deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
+                    };
+                }
+
+                // Wait for local shutdown (driven by poll_read) to complete.
+                Self::LocalShutdown {
+                    ref mut deadline,
+                    ref mut rx_state,
+                    ref mut tx_closed,
+                } => {
+                    match deadline.as_mut().poll(cx) {
+                        Poll::Ready(()) => {
+                            log::debug!("Dropping stream after shutdown timeout was exceeded.");
+                            *self = Self::LocalShutdownComplete;
+                            return Poll::Ready(Ok(()));
+                        }
+                        Poll::Pending => (),
+                    }
+
+                    if !*tx_closed {
+                        // We cannot use ready! here, because we want to poll the
+                        // receiving side in parallel.
+                        match stream.as_mut().poll_shutdown(cx) {
+                            Poll::Pending => (),
+                            Poll::Ready(Ok(())) => {
+                                *tx_closed = true;
+                            }
+                            Poll::Ready(Err(e)) => {
+                                log::debug!(
+                                    "Ignoring write error during local stream shutdown: {e}"
+                                );
+                                *tx_closed = true;
+                            }
+                        }
+                    }
+
+                    match rx_state {
+                        RxShutdownState::Done => {
+                            if !*tx_closed {
+                                // poll_close() returned Poll::Pending, so we have to
+                                // return that, too.
+                                return Poll::Pending;
+                            }
+                        }
+                        // We can use ready! here because the `poll_close` has
+                        // happened already; we don't want to poll anything else
+                        // anymore.
+                        _ => loop {
+                            match ready!(stream.as_mut().poll_next(cx)) {
+                                None => {
+                                    if *rx_state != RxShutdownState::AwaitingEof {
+                                        log::debug!("Ignoring early EOF during stream shutdown.");
+                                    }
+                                    *rx_state = RxShutdownState::Done;
+                                    break;
+                                }
+                                Some(Ok(data)) => {
+                                    log::debug!("Ignoring data received on stream during local shutdown: {data:?}");
+                                }
+                                Some(Err(ReadError::SoftTimeout)) => (),
+                                Some(Err(ReadError::HardError(e))) => {
+                                    *rx_state = RxShutdownState::Done;
+                                    log::debug!("Ignoring read error during local shutdown: {e}");
+                                    break;
+                                }
+                                Some(Err(ReadError::ParseError(e))) => {
+                                    log::debug!(
+                                        "Ignoring parse error during local shutdown: {}",
+                                        e
+                                    );
+                                }
+                                Some(Err(ReadError::StreamFooterReceived)) => match rx_state {
+                                    RxShutdownState::AwaitingFooter => {
+                                        *rx_state = RxShutdownState::AwaitingEof;
+                                    }
+                                    RxShutdownState::AwaitingEof => {
+                                        unreachable!("multiple stream footers?!")
+                                    }
+                                    RxShutdownState::Done => unreachable!(),
+                                },
+                            }
+                        },
+                    }
+
+                    if *tx_closed && *rx_state == RxShutdownState::Done {
+                        // Now that everything is properly cleaned up on the
+                        // xmlstream layer, we go through with closure.
+                        ready!(<XmppStream as Sink<&Stanza>>::poll_close(
+                            stream.as_mut(),
+                            cx
+                        ))?;
+                        //ย And now that's done, we can finally call it a day.
+                        *self = Self::LocalShutdownComplete;
+                        return Poll::Ready(Ok(()));
+                    } else {
+                        return Poll::Pending;
+                    }
+                }
+
+                Self::LocalShutdownComplete => return Poll::Ready(Ok(())),
+            }
+        }
+    }
+
+    pub(super) fn start_send_stream_error(&mut self, error: StreamError) {
+        match self {
+            Self::LocalShutdownComplete
+            | Self::LocalShutdown { .. }
+            | Self::RemoteShutdown { .. }
+            | Self::Failed { .. } => {
+                log::debug!("Request to send stream error ({error}), but we are already shutting down or have already failed. Discarding.");
+                return;
+            }
+
+            Self::Ready { .. } | Self::Negotiating { .. } => {}
+
+            Self::SendStreamError { .. } => {
+                log::debug!("Request to send stream error ({error}) while transmission of another stream error is already in progress. Discarding the new one.");
+                return;
+            }
+        }
+
+        *self = Self::SendStreamError {
+            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
+            stream_error: Some(error),
+            io_error: None,
+        };
+    }
+
+    pub fn queue_sm_request(&mut self) -> bool {
+        match self {
+            Self::Ready { sm_state, .. } => {
+                if let Some(sm_state) = sm_state {
+                    sm_state.pending_req = true;
+                    true
+                } else {
+                    false
+                }
+            }
+            _ => false,
+        }
+    }
+}

tokio-xmpp/src/stanzastream/error.rs ๐Ÿ”—

@@ -0,0 +1,16 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use xso::{AsXml, FromXml};
+
+static NS: &str = "https://xmlns.xmpp.rs/stream-errors";
+
+/// Represents a parse error.
+///
+/// Details are found in the `<text/>`.
+#[derive(FromXml, AsXml, Debug)]
+#[xml(namespace = NS, name = "parse-error")]
+pub struct ParseError;

tokio-xmpp/src/stanzastream/mod.rs ๐Ÿ”—

@@ -0,0 +1,268 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+//! # Resilient stanza stream
+//!
+//! This module provides the [`StanzaStream`], which is the next level up from
+//! the low-level [`XmlStream`][`crate::xmlstream::XmlStream`].
+//!
+//! The stanza stream knows about XMPP and it most importantly knows how to
+//! fix a broken connection with a reconnect and how to do this smoothly using
+//! [XEP-0198 (Stream Management)](https://xmpp.org/extensions/xep-0198.html).
+//! XEP-0198 is only used if the peer supports it. If the peer does not
+//! support XEP-0198, automatic reconnects are still done, but with more
+//! undetectable data loss.
+//!
+//! The main API entrypoint for the stanza stream is, unsurprisingly,
+//! [`StanzaStream`].
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use core::time::Duration;
+
+// TODO: ensure that IDs are always set on stanzas.
+
+// TODO: figure out what to do with the mpsc::Sender<QueueEntry> on lossy
+// stream reconnects. Keeping it may cause stanzas to be sent which weren't
+// meant for that stream, replacing it is racy.
+
+use futures::{SinkExt, Stream};
+
+use tokio::sync::{mpsc, oneshot};
+
+use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
+
+use crate::connect::ServerConnector;
+use crate::xmlstream::Timeouts;
+use crate::Stanza;
+
+mod connected;
+mod error;
+mod negotiation;
+mod queue;
+mod stream_management;
+mod worker;
+
+use self::queue::QueueEntry;
+pub use self::queue::{StanzaStage, StanzaState, StanzaToken};
+pub use self::worker::{Connection, XmppStream};
+use self::worker::{StanzaStreamWorker, LOCAL_SHUTDOWN_TIMEOUT};
+
+/// Event informing about the change of the [`StanzaStream`]'s status.
+#[derive(Debug)]
+pub enum StreamEvent {
+    /// The stream was (re-)established **with** loss of state.
+    Reset {
+        /// The new JID to which the stream is bound.
+        bound_jid: Jid,
+
+        /// The features reported by the stream.
+        features: StreamFeatures,
+    },
+
+    /// The stream is currently inactive because a connection was lost.
+    ///
+    /// Resumption without loss of state is still possible. This event is
+    /// merely informative and may be used to prolong timeouts or inform the
+    /// user that the connection is currently unstable.
+    Suspended,
+
+    /// The stream was reestablished **without** loss of state.
+    ///
+    /// This is merely informative. Potentially useful to prolong timeouts.
+    Resumed,
+}
+
+/// Event emitted by the [`StanzaStream`].
+///
+/// Note that stream closure is not an explicit event, but the end of the
+/// event stream itself.
+#[derive(Debug)]
+pub enum Event {
+    /// The streams connectivity status has changed.
+    Stream(StreamEvent),
+
+    /// A stanza was received over the stream.
+    Stanza(Stanza),
+}
+
+/// Frontend interface to a reliable, always-online stanza stream.
+pub struct StanzaStream {
+    rx: mpsc::Receiver<Event>,
+    tx: mpsc::Sender<QueueEntry>,
+}
+
+impl StanzaStream {
+    /// Establish a new client-to-server stream using the given
+    /// [`ServerConnector`].
+    ///
+    /// `jid` and `password` must be the user account's credentials. `jid` may
+    /// either be a bare JID (to let the server choose a resource) or a full
+    /// JID (to request a specific resource from the server, with no guarantee
+    /// of succcess).
+    ///
+    /// `timeouts` controls the responsiveness to connection interruptions
+    /// on the underlying transports. Please see the [`Timeouts`] struct's
+    /// documentation for hints on how to correctly configure this.
+    ///
+    /// The `queue_depth` controls the sizes for the incoming and outgoing
+    /// stanza queues. If the size is exceeded, the corresponding direction
+    /// will block until the queues can be flushed. Note that the respective
+    /// reverse direction is not affected (i.e. if your outgoing queue is
+    /// full for example because of a slow server, you can still receive
+    /// data).
+    pub fn new_c2s<C: ServerConnector>(
+        server: C,
+        jid: Jid,
+        password: String,
+        timeouts: Timeouts,
+        queue_depth: usize,
+    ) -> Self {
+        let reconnector = Box::new(
+            move |_preferred_location: Option<String>, slot: oneshot::Sender<Connection>| {
+                let jid = jid.clone();
+                let server = server.clone();
+                let password = password.clone();
+                tokio::spawn(async move {
+                    const MAX_DELAY: Duration = Duration::new(30, 0);
+                    let mut delay = Duration::new(1, 0);
+                    loop {
+                        log::debug!("Starting new connection as {}", jid);
+                        match crate::client::login::client_auth(
+                            server.clone(),
+                            jid.clone(),
+                            password.clone(),
+                            timeouts,
+                        )
+                        .await
+                        {
+                            Ok((features, stream)) => {
+                                log::debug!("Connection as {} established", jid);
+                                let stream = stream.box_stream();
+                                let Err(mut conn) = slot.send(Connection {
+                                    stream,
+                                    features,
+                                    identity: jid,
+                                }) else {
+                                    // Send succeeded, we're done here.
+                                    return;
+                                };
+
+                                log::debug!("StanzaStream dropped, attempting graceful termination of fresh stream.");
+                                // Send failed, i.e. the stanzastream is dead. Let's
+                                // be polite and close this stream cleanly.
+                                // We don't care whether that works, though, we
+                                // just want to release the resources after a
+                                // defined amount of time.
+                                let _: Result<_, _> = tokio::time::timeout(
+                                    LOCAL_SHUTDOWN_TIMEOUT,
+                                    <XmppStream as SinkExt<&Stanza>>::close(&mut conn.stream),
+                                )
+                                .await;
+                                return;
+                            }
+                            Err(e) => {
+                                // TODO: auth errors should probably be fatal??
+                                log::error!("Failed to connect: {}. Retrying in {:?}.", e, delay);
+                                tokio::time::sleep(delay).await;
+                                delay = delay * 2;
+                                if delay > MAX_DELAY {
+                                    delay = MAX_DELAY;
+                                }
+                            }
+                        }
+                    }
+                });
+            },
+        );
+        Self::new(reconnector, queue_depth)
+    }
+
+    /// Create a new stanza stream.
+    ///
+    /// Stanza streams operate using a `connector` which is responsible for
+    /// producing a new stream whenever necessary. It is the connector's
+    /// responsibility that:
+    ///
+    /// - It never fails to send to the channel it is given. If the connector
+    ///   drops the channel, the `StanzaStream` will consider this fatal and
+    ///   fail the stream.
+    ///
+    /// - All streams are authenticated and secured as necessary.
+    ///
+    /// - All streams are authenticated for the same entity. If the connector
+    ///   were to provide streams for different identities, information leaks
+    ///   could occur as queues from previous sessions are being flushed on
+    ///   the new stream on a reconnect.
+    ///
+    /// Most notably, the `connector` is **not** responsible for performing
+    /// resource binding: Resource binding is handled by the `StanzaStream`.
+    ///
+    /// `connector` will be called soon after `new()` was called to establish
+    /// the first underlying stream for the `StanzaStream`.
+    ///
+    /// The `queue_depth` controls the sizes for the incoming and outgoing
+    /// stanza queues. If the size is exceeded, the corresponding direction
+    /// will block until the queues can be flushed. Note that the respective
+    /// reverse direction is not affected (i.e. if your outgoing queue is
+    /// full for example because of a slow server, you can still receive
+    /// data).
+    pub fn new(
+        connector: Box<dyn FnMut(Option<String>, oneshot::Sender<Connection>) + Send + 'static>,
+        queue_depth: usize,
+    ) -> Self {
+        // c2f = core to frontend, f2c = frontend to core
+        let (f2c_tx, c2f_rx) = StanzaStreamWorker::spawn(connector, queue_depth);
+        Self {
+            tx: f2c_tx,
+            rx: c2f_rx,
+        }
+    }
+
+    async fn assert_send(&self, cmd: QueueEntry) {
+        match self.tx.send(cmd).await {
+            Ok(()) => (),
+            Err(_) => panic!("Stream closed or the stream's background workers have crashed."),
+        }
+    }
+
+    /// Close the stream.
+    ///
+    /// This will initiate a clean shutdown of the stream and will prevent and
+    /// cancel any more reconnection attempts.
+    pub async fn close(mut self) {
+        drop(self.tx); // closes stream.
+        while let Some(ev) = self.rx.recv().await {
+            log::trace!("discarding event {:?} after stream closure", ev);
+        }
+    }
+
+    /// Send a stanza via the stream.
+    ///
+    /// Note that completion of this function merely signals that the stanza
+    /// has been enqueued successfully: it may be stuck in the transmission
+    /// queue for quite a while if the stream is currently disconnected. The
+    /// transmission progress can be observed via the returned
+    /// [`StanzaToken`].
+    ///
+    /// # Panics
+    ///
+    /// If the stream has failed catastrophically (i.e. due to a software
+    /// bug), this function may panic.
+    pub async fn send(&self, stanza: Box<Stanza>) -> StanzaToken {
+        let (queue_entry, token) = QueueEntry::tracked(stanza);
+        self.assert_send(queue_entry).await;
+        token
+    }
+}
+
+impl Stream for StanzaStream {
+    type Item = Event;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        self.rx.poll_recv(cx)
+    }
+}

tokio-xmpp/src/stanzastream/negotiation.rs ๐Ÿ”—

@@ -0,0 +1,534 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use core::ops::ControlFlow::{self, Break, Continue};
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use std::io;
+
+use futures::{ready, Sink, Stream};
+
+use xmpp_parsers::{
+    bind::{BindQuery, BindResponse},
+    iq::{Iq, IqType},
+    jid::{FullJid, Jid},
+    sm,
+    stream_error::{DefinedCondition, StreamError},
+    stream_features::StreamFeatures,
+};
+
+use crate::xmlstream::{ReadError, XmppStreamElement};
+use crate::Stanza;
+
+use super::queue::{QueueEntry, TransmitQueue};
+use super::stream_management::*;
+use super::worker::{parse_error_to_stream_error, XmppStream};
+
+static BIND_REQ_ID: &str = "resource-binding";
+
+pub(super) enum NegotiationState {
+    /// Send request to enable or resume stream management.
+    SendSmRequest {
+        /// Stream management state to use. If present, resumption will be
+        /// attempted. Otherwise, a fresh session will be established.
+        sm_state: Option<SmState>,
+
+        /// If the stream has been freshly bound, we carry the bound JID along
+        /// with us.
+        bound_jid: Option<FullJid>,
+    },
+
+    /// Await the response to the SM enable/resume request.
+    ReceiveSmResponse {
+        /// State to use.
+        sm_state: Option<SmState>,
+
+        /// If the stream has been freshly bound, we carry the bound JID along
+        /// with us.
+        bound_jid: Option<FullJid>,
+    },
+
+    /// Send a new request to bind to a resource.
+    SendBindRequest { sm_supported: bool },
+
+    /// Receive the bind response.
+    ReceiveBindResponse { sm_supported: bool },
+}
+
+/// The ultimate result of a stream negotiation.
+pub(super) enum NegotiationResult {
+    /// An unplanned disconnect happened or a stream error was received from
+    /// the remote party.
+    Disconnect {
+        /// Stream management state for a later resumption attempt.
+        sm_state: Option<SmState>,
+
+        /// I/O error which came along the disconnect.
+        error: io::Error,
+    },
+
+    /// The negotiation completed successfully, but the stream was reset (i.e.
+    /// stream management and all session state was lost).
+    StreamReset {
+        /// Stream management state. This may still be non-None if the new
+        /// stream has successfully negotiated stream management.
+        sm_state: Option<SmState>,
+
+        /// The JID to which the stream is now bound.
+        bound_jid: Jid,
+    },
+
+    /// The negotiation completed successfully and a previous session was
+    /// resumed.
+    StreamResumed {
+        /// Negotiated stream management state.
+        sm_state: SmState,
+    },
+
+    /// The negotiation failed and we need to emit a stream error.
+    ///
+    /// **Note:** Stream errors *received* from the peer are signalled using
+    /// [`Self::Disconnect`] instead, with an I/O error of kind `Other`.
+    StreamError {
+        /// Stream error to send to the remote party with details about the
+        /// failure.
+        error: StreamError,
+    },
+}
+
+impl NegotiationState {
+    pub fn new(features: &StreamFeatures, sm_state: Option<SmState>) -> io::Result<Self> {
+        match sm_state {
+            Some(sm_state) => {
+                if features.stream_management.is_some() {
+                    return Ok(Self::SendSmRequest {
+                        sm_state: Some(sm_state),
+                        bound_jid: None,
+                    });
+                } else {
+                    log::warn!("Peer is not offering stream management anymore. Dropping state.");
+                }
+            }
+            None => (),
+        }
+
+        if !features.can_bind() {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "Peer is not offering the bind feature. Cannot proceed with stream negotiation.",
+            ));
+        }
+
+        Ok(Self::SendBindRequest {
+            sm_supported: features.stream_management.is_some(),
+        })
+    }
+
+    fn flush(stream: Pin<&mut XmppStream>, cx: &mut Context) -> ControlFlow<io::Error, ()> {
+        match <XmppStream as Sink<&XmppStreamElement>>::poll_flush(stream, cx) {
+            Poll::Pending | Poll::Ready(Ok(())) => Continue(()),
+            Poll::Ready(Err(error)) => Break(error),
+        }
+    }
+
+    pub fn advance(
+        &mut self,
+        mut stream: Pin<&mut XmppStream>,
+        jid: &Jid,
+        transmit_queue: &mut TransmitQueue<QueueEntry>,
+        cx: &mut Context<'_>,
+    ) -> Poll<ControlFlow<NegotiationResult, Option<Stanza>>> {
+        // When sending requests, we need to wait for the stream to become
+        // ready to send and then send the corresponding request.
+        // Note that if this wasn't a fresh stream (which it always is!),
+        // doing it in this kind of simplex fashion could lead to deadlocks
+        // (because we are blockingly sending without attempting to receive: a
+        // peer could stop receiving from our side if their tx buffer was too
+        // full or smth). However, because this stream is fresh, we know that
+        // our tx buffers are empty enough that this will succeed quickly, so
+        // that we can proceed.
+        // TODO: define a deadline for negotiation.
+        match self {
+            Self::SendBindRequest { sm_supported } => {
+                match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
+                    stream.as_mut(),
+                    cx
+                )) {
+                    // We can send.
+                    Ok(()) => (),
+
+                    // Stream broke horribly.
+                    Err(error) => {
+                        return Poll::Ready(Break(NegotiationResult::Disconnect {
+                            sm_state: None,
+                            error,
+                        }))
+                    }
+                };
+
+                let resource = jid.resource().map(|x| x.as_str().to_owned());
+                let stanza = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
+                match stream.start_send(&stanza) {
+                    Ok(()) => (),
+                    Err(e) => panic!("failed to serialize BindQuery: {}", e),
+                };
+
+                *self = Self::ReceiveBindResponse {
+                    sm_supported: *sm_supported,
+                };
+                Poll::Ready(Continue(None))
+            }
+
+            Self::ReceiveBindResponse { sm_supported } => {
+                match Self::flush(stream.as_mut(), cx) {
+                    Break(error) => {
+                        return Poll::Ready(Break(NegotiationResult::Disconnect {
+                            sm_state: None,
+                            error,
+                        }))
+                    }
+                    Continue(()) => (),
+                }
+
+                let item = ready!(stream.poll_next(cx));
+                let item = item.unwrap_or_else(|| {
+                    Err(ReadError::HardError(io::Error::new(
+                        io::ErrorKind::UnexpectedEof,
+                        "eof before stream footer",
+                    )))
+                });
+
+                match item {
+                    Ok(XmppStreamElement::Stanza(data)) => match data {
+                        Stanza::Iq(iq) if iq.id == BIND_REQ_ID => {
+                            let error = match iq.payload {
+                                IqType::Result(Some(payload)) => {
+                                    match BindResponse::try_from(payload) {
+                                        Ok(v) => {
+                                            let bound_jid = v.into();
+                                            if *sm_supported {
+                                                *self = Self::SendSmRequest {
+                                                    sm_state: None,
+                                                    bound_jid: Some(bound_jid),
+                                                };
+                                                return Poll::Ready(Continue(None));
+                                            } else {
+                                                return Poll::Ready(Break(
+                                                    NegotiationResult::StreamReset {
+                                                        sm_state: None,
+                                                        bound_jid: Jid::from(bound_jid),
+                                                    },
+                                                ));
+                                            }
+                                        }
+                                        Err(e) => e.to_string(),
+                                    }
+                                }
+                                IqType::Result(None) => "Bind response has no payload".to_owned(),
+                                _ => "Unexpected IQ type in response to bind request".to_owned(),
+                            };
+                            log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error.");
+                            Poll::Ready(Break(NegotiationResult::StreamError {
+                                error: StreamError {
+                                    condition: DefinedCondition::UndefinedCondition,
+                                    text: Some((None, error)),
+                                    application_specific: vec![super::error::ParseError.into()],
+                                },
+                            }))
+                        }
+                        st => {
+                            log::warn!("Received unexpected stanza before response to bind request: {st:?}. Dropping.");
+                            Poll::Ready(Continue(None))
+                        }
+                    },
+
+                    Ok(XmppStreamElement::StreamError(error)) => {
+                        log::debug!("Received stream:error, failing stream and discarding any stream management state.");
+                        let error = io::Error::new(io::ErrorKind::Other, error);
+                        transmit_queue.fail(&(&error).into());
+                        Poll::Ready(Break(NegotiationResult::Disconnect {
+                            error,
+                            sm_state: None,
+                        }))
+                    }
+
+                    Ok(other) => {
+                        log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error.");
+                        Poll::Ready(Break(NegotiationResult::StreamError {
+                            error: StreamError {
+                                condition: DefinedCondition::UnsupportedStanzaType,
+                                text: None,
+                                application_specific: vec![],
+                            },
+                        }))
+                    }
+
+                    // Soft timeouts during negotiation are a bad sign
+                    // (because we already prompted the server to send
+                    // something and are waiting for it), but also nothing
+                    // to write home about.
+                    Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
+
+                    // Parse errors during negotiation cause an unconditional
+                    // stream error.
+                    Err(ReadError::ParseError(e)) => {
+                        Poll::Ready(Break(NegotiationResult::StreamError {
+                            error: parse_error_to_stream_error(e),
+                        }))
+                    }
+
+                    // I/O errors cause the stream to be considered
+                    // broken; we drop it and send a Disconnect event with
+                    // the error embedded.
+                    Err(ReadError::HardError(error)) => {
+                        Poll::Ready(Break(NegotiationResult::Disconnect {
+                            sm_state: None,
+                            error,
+                        }))
+                    }
+
+                    // Stream footer during negotation is really weird.
+                    // We kill the stream immediately with an error
+                    // (but allow preservation of the SM state).
+                    Err(ReadError::StreamFooterReceived) => {
+                        Poll::Ready(Break(NegotiationResult::Disconnect {
+                            sm_state: None,
+                            error: io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                "stream footer received during negotation",
+                            ),
+                        }))
+                    }
+                }
+            }
+
+            Self::SendSmRequest {
+                sm_state,
+                bound_jid,
+            } => {
+                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
+                    stream.as_mut(),
+                    cx
+                )) {
+                    // We can send.
+                    Ok(()) => (),
+
+                    // Stream broke horribly.
+                    Err(error) => {
+                        return Poll::Ready(Break(NegotiationResult::Disconnect {
+                            sm_state: sm_state.take(),
+                            error,
+                        }))
+                    }
+                };
+
+                let nonza = if let Some((id, inbound_ctr)) =
+                    sm_state.as_ref().and_then(|x| x.resume_info())
+                {
+                    // Attempt resumption
+                    sm::Nonza::Resume(sm::Resume {
+                        h: inbound_ctr,
+                        previd: sm::StreamId(id.to_owned()),
+                    })
+                } else {
+                    // Attempt enabling
+                    sm::Nonza::Enable(sm::Enable {
+                        max: None,
+                        resume: true,
+                    })
+                };
+                match stream.start_send(&XmppStreamElement::SM(nonza)) {
+                    Ok(()) => (),
+                    Err(e) => {
+                        // We panic here, instead of returning an
+                        // error, because after we confirmed via
+                        // poll_ready that the stream is ready to
+                        // send, the only error returned by start_send
+                        // can be caused by our data.
+                        panic!("Failed to send SM nonza: {}", e);
+                    }
+                }
+
+                *self = Self::ReceiveSmResponse {
+                    sm_state: sm_state.take(),
+                    bound_jid: bound_jid.take(),
+                };
+                // Ask caller to poll us again immediately in order to
+                // start flushing the stream.
+                Poll::Ready(Continue(None))
+            }
+
+            Self::ReceiveSmResponse {
+                sm_state,
+                bound_jid,
+            } => {
+                match Self::flush(stream.as_mut(), cx) {
+                    Break(error) => {
+                        return Poll::Ready(Break(NegotiationResult::Disconnect {
+                            sm_state: sm_state.take(),
+                            error,
+                        }))
+                    }
+                    Continue(()) => (),
+                }
+
+                // So the difficulty here is that there's a possibility
+                // that we receive non-SM data while the SM negotiation
+                // is going on.
+
+                let item = ready!(stream.poll_next(cx));
+                let item = item.unwrap_or_else(|| {
+                    Err(ReadError::HardError(io::Error::new(
+                        io::ErrorKind::UnexpectedEof,
+                        "eof before stream footer",
+                    )))
+                });
+                match item {
+                    // Pre-SM data. Note that we mustn't count this while we
+                    // are still in negotiating state: we transit to
+                    // [`Self::Ready`] immediately after we got the
+                    // `<resumed/>` or `<enabled/>`, and before we got that,
+                    // counting inbound stanzas is definitely wrong (see e.g.
+                    // aioxmpp commit 796aa32).
+                    Ok(XmppStreamElement::Stanza(data)) => Poll::Ready(Continue(Some(data))),
+
+                    Ok(XmppStreamElement::SM(sm::Nonza::Enabled(enabled))) => {
+                        if sm_state.is_some() {
+                            // Okay, the peer violated the stream management
+                            // protocol here (or we have a bug).
+                            log::warn!(
+                                "Received <enabled/>, but we also have previous SM state. One of us has a bug here (us or the peer) and I'm not sure which it is. If you can reproduce this, please re-run with trace loglevel and provide the logs. Attempting to proceed with a fresh session.",
+                            );
+                        }
+                        // We must emit Reset here because this is a
+                        // fresh stream and we did not resume.
+                        Poll::Ready(Break(NegotiationResult::StreamReset {
+                            sm_state: Some(enabled.into()),
+                            bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
+                        }))
+                    }
+
+                    Ok(XmppStreamElement::SM(sm::Nonza::Resumed(resumed))) => match sm_state.take()
+                    {
+                        Some(mut sm_state) => {
+                            // Yay!
+                            match sm_state.resume(resumed.h) {
+                                Ok(to_retransmit) => transmit_queue.requeue_all(to_retransmit),
+                                Err(e) => {
+                                    // We kill the stream with an error
+                                    log::error!("Resumption failed: {e}");
+                                    return Poll::Ready(Break(NegotiationResult::StreamError {
+                                        error: e.into(),
+                                    }));
+                                }
+                            }
+                            Poll::Ready(Break(NegotiationResult::StreamResumed { sm_state }))
+                        }
+                        None => {
+                            // Okay, the peer violated the stream management
+                            // protocol here (or we have a bug).
+                            // Unlike the
+                            // received-enabled-but-attempted-to-resume
+                            // situation, we do not have enough information to
+                            // proceed without having the stream break soon.
+                            // (If we proceeded without a SM state, we would
+                            // have the stream die as soon as the peer
+                            // requests our counters).
+                            // We thus terminate the stream with an error.
+                            // We must emit Reset here because this is a fresh
+                            // stream and we did not resume.
+                            Poll::Ready(Break(NegotiationResult::Disconnect {
+                                sm_state: None,
+                                error: io::Error::new(io::ErrorKind::InvalidData, "Peer replied to <sm:enable/> request with <sm:resumed/> response"),
+                            }))
+                        }
+                    },
+
+                    Ok(XmppStreamElement::SM(sm::Nonza::Failed(failed))) => match sm_state {
+                        Some(sm_state) => {
+                            log::debug!("Received <sm:failed/> in response to resumption request. Discarding SM data and attempting to renegotiate.");
+                            if let Some(h) = failed.h {
+                                // This is only an optimization anyway, so
+                                // we can also just ignore this.
+                                let _: Result<_, _> = sm_state.remote_acked(h);
+                            }
+                            *self = Self::SendBindRequest { sm_supported: true };
+                            Poll::Ready(Continue(None))
+                        }
+                        None => {
+                            log::warn!("Received <sm:failed/> in response to enable request. Proceeding without stream management.");
+
+                            // We must emit Reset here because this is a
+                            // fresh stream and we did not resume.
+                            Poll::Ready(Break(NegotiationResult::StreamReset {
+                                bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(),
+                                sm_state: None,
+                            }))
+                        }
+                    },
+
+                    Ok(XmppStreamElement::StreamError(error)) => {
+                        log::debug!("Received stream error, failing stream and discarding any stream management state.");
+                        let error = io::Error::new(io::ErrorKind::Other, error);
+                        transmit_queue.fail(&(&error).into());
+                        Poll::Ready(Break(NegotiationResult::Disconnect {
+                            error,
+                            sm_state: None,
+                        }))
+                    }
+
+                    Ok(other) => {
+                        log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error.");
+                        Poll::Ready(Break(NegotiationResult::StreamError {
+                            error: StreamError {
+                                condition: DefinedCondition::UnsupportedStanzaType,
+                                text: None,
+                                application_specific: vec![],
+                            },
+                        }))
+                    }
+
+                    // Soft timeouts during negotiation are a bad sign
+                    // (because we already prompted the server to send
+                    // something and are waiting for it), but also nothing
+                    // to write home about.
+                    Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)),
+
+                    // Parse errors during negotiation cause an unconditional
+                    // stream error.
+                    Err(ReadError::ParseError(e)) => {
+                        Poll::Ready(Break(NegotiationResult::StreamError {
+                            error: parse_error_to_stream_error(e),
+                        }))
+                    }
+
+                    // I/O errors cause the stream to be considered
+                    // broken; we drop it and send a Disconnect event with
+                    // the error embedded.
+                    Err(ReadError::HardError(error)) => {
+                        Poll::Ready(Break(NegotiationResult::Disconnect {
+                            sm_state: sm_state.take(),
+                            error,
+                        }))
+                    }
+
+                    // Stream footer during negotation is really weird.
+                    // We kill the stream immediately with an error
+                    // (but allow preservation of the SM state).
+                    Err(ReadError::StreamFooterReceived) => {
+                        Poll::Ready(Break(NegotiationResult::Disconnect {
+                            sm_state: sm_state.take(),
+                            error: io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                "stream footer received during negotation",
+                            ),
+                        }))
+                    }
+                }
+            }
+        }
+    }
+}

tokio-xmpp/src/stanzastream/queue.rs ๐Ÿ”—

@@ -0,0 +1,356 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use core::cmp::Ordering;
+use core::fmt;
+use core::task::{Context, Poll};
+use std::collections::VecDeque;
+use std::io;
+
+use futures::ready;
+
+use tokio::sync::{mpsc, watch};
+
+use crate::Stanza;
+
+#[derive(Debug, Clone)]
+pub struct OpaqueIoError {
+    kind: io::ErrorKind,
+    message: String,
+}
+
+impl OpaqueIoError {
+    pub fn kind(&self) -> io::ErrorKind {
+        self.kind
+    }
+
+    pub fn into_io_error(self) -> io::Error {
+        io::Error::new(self.kind, self.message)
+    }
+
+    pub fn to_io_error(&self) -> io::Error {
+        io::Error::new(self.kind, self.message.clone())
+    }
+}
+
+impl From<io::Error> for OpaqueIoError {
+    fn from(other: io::Error) -> Self {
+        <Self as From<&io::Error>>::from(&other)
+    }
+}
+
+impl From<&io::Error> for OpaqueIoError {
+    fn from(other: &io::Error) -> Self {
+        Self {
+            kind: other.kind(),
+            message: other.to_string(),
+        }
+    }
+}
+
+impl fmt::Display for OpaqueIoError {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.write_str(&self.message)
+    }
+}
+
+impl core::error::Error for OpaqueIoError {}
+
+/// The five stages of stanza transmission.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
+pub enum StanzaStage {
+    /// The stanza is in the transmit queue, but has not been serialised or
+    /// sent to the stream yet.
+    Queued,
+
+    /// The stanza was successfully serialised and put into the transmit
+    /// buffers.
+    Sent,
+
+    /// The stanza has been acked by the peer using XEP-0198 or comparable
+    /// means.
+    ///
+    /// **Note:** This state is only ever reached on streams where XEP-0198
+    /// was succesfully negotiated.
+    Acked,
+
+    /// Stanza transmission or serialisation failed.
+    Failed,
+
+    /// The stanza was dropped from the transmit queue before it could be
+    /// sent.
+    ///
+    /// This may happen if the stream breaks in a fatal, panick-y way.
+    Dropped,
+}
+
+impl From<&StanzaState> for StanzaStage {
+    fn from(other: &StanzaState) -> Self {
+        match other {
+            StanzaState::Queued => Self::Queued,
+            StanzaState::Sent { .. } => Self::Sent,
+            StanzaState::Acked { .. } => Self::Acked,
+            StanzaState::Failed { .. } => Self::Failed,
+            StanzaState::Dropped => Self::Dropped,
+        }
+    }
+}
+
+impl PartialEq<StanzaStage> for StanzaState {
+    fn eq(&self, other: &StanzaStage) -> bool {
+        StanzaStage::from(self).eq(other)
+    }
+}
+
+impl PartialEq<StanzaState> for StanzaStage {
+    fn eq(&self, other: &StanzaState) -> bool {
+        self.eq(&Self::from(other))
+    }
+}
+
+impl PartialOrd<StanzaStage> for StanzaState {
+    fn partial_cmp(&self, other: &StanzaStage) -> Option<Ordering> {
+        StanzaStage::from(self).partial_cmp(other)
+    }
+}
+
+impl PartialOrd<StanzaState> for StanzaStage {
+    fn partial_cmp(&self, other: &StanzaState) -> Option<Ordering> {
+        self.partial_cmp(&Self::from(other))
+    }
+}
+
+/// State of a stanza in transit to the peer.
+#[derive(Debug, Clone)]
+pub enum StanzaState {
+    /// The stanza has been enqueued in the local queue but not sent yet.
+    Queued,
+
+    /// The stanza has been sent to the server, but there is no proof that it
+    /// has been received by the server yet.
+    Sent {
+        /*
+        /// The time from when the stanza was enqueued until the time it was
+        /// sent on the stream.
+        queue_delay: Duration,
+        */
+    },
+
+    /// Confirmation that the stanza has been seen by the server has been
+    /// received.
+    Acked {
+        /*
+        /// The time from when the stanza was enqueued until the time it was
+        /// sent on the stream.
+        queue_delay: Duration,
+
+        /// The time between sending the stanza on the stream and receiving
+        /// confirmation from the server.
+        ack_delay: Duration,
+        */
+    },
+
+    /// Sending the stanza has failed in a non-recoverable manner.
+    Failed {
+        /// The error which caused the sending to fail.
+        error: OpaqueIoError,
+    },
+
+    /// The stanza was dropped out of the queue for unspecified reasons,
+    /// such as the stream breaking in a fatal, panick-y way.
+    Dropped,
+}
+
+/// Track stanza transmission through the
+/// [`StanzaStream`][`super::StanzaStream`] up to the peer.
+#[derive(Clone)]
+pub struct StanzaToken {
+    inner: watch::Receiver<StanzaState>,
+}
+
+impl StanzaToken {
+    /// Wait for the stanza transmission to reach the given state.
+    ///
+    /// If the stanza is removed from tracking before that state is reached,
+    /// `None` is returned.
+    pub async fn wait_for(&mut self, state: StanzaStage) -> Option<StanzaState> {
+        self.inner
+            .wait_for(|st| *st >= state)
+            .await
+            .map(|x| x.clone())
+            .ok()
+    }
+
+    /// Read the current transmission state.
+    pub fn state(&self) -> StanzaState {
+        self.inner.borrow().clone()
+    }
+}
+
+pub(super) struct QueueEntry {
+    pub stanza: Box<Stanza>,
+    pub token: watch::Sender<StanzaState>,
+}
+
+impl QueueEntry {
+    pub fn untracked(st: Box<Stanza>) -> Self {
+        Self::tracked(st).0
+    }
+
+    pub fn tracked(st: Box<Stanza>) -> (Self, StanzaToken) {
+        let (tx, rx) = watch::channel(StanzaState::Queued);
+        let token = StanzaToken { inner: rx };
+        (
+            QueueEntry {
+                stanza: st,
+                token: tx,
+            },
+            token,
+        )
+    }
+}
+
+/// Reference to a transmit queue entry.
+///
+/// On drop, the entry is returned to the queue.
+pub(super) struct TransmitQueueRef<'x, T> {
+    q: &'x mut VecDeque<T>,
+}
+
+impl<'x, T> TransmitQueueRef<'x, T> {
+    /// Take the item out of the queue.
+    pub fn take(self) -> T {
+        // Unwrap: when this type is created, a check is made that the queue
+        // actually has a front item and because we borrow, that also cannot
+        // change.
+        self.q.pop_front().unwrap()
+    }
+}
+
+/// A transmit queue coupled to an [`mpsc::Receiver`].
+///
+/// The transmit queue will by default only allow one element to reside in the
+/// queue outside the inner `Receiver`: the main queueing happens inside the
+/// receiver and is governed by its queue depth and associated backpressure.
+///
+/// However, the queue does allow prepending elements to the front, which is
+/// useful for retransmitting items.
+pub(super) struct TransmitQueue<T: Unpin> {
+    inner: mpsc::Receiver<T>,
+    peek: VecDeque<T>,
+}
+
+impl<T: Unpin> TransmitQueue<T> {
+    /// Create a new transmission queue around an existing mpsc receiver.
+    pub fn wrap(ch: mpsc::Receiver<T>) -> Self {
+        Self {
+            inner: ch,
+            peek: VecDeque::with_capacity(1),
+        }
+    }
+
+    /// Create a new mpsc channel and wrap the receiving side in a
+    /// transmission queue
+    pub fn channel(depth: usize) -> (mpsc::Sender<T>, Self) {
+        let (tx, rx) = mpsc::channel(depth);
+        (tx, Self::wrap(rx))
+    }
+
+    /// Poll the queue for the next item to transmit.
+    pub fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<TransmitQueueRef<'_, T>>> {
+        if self.peek.len() > 0 {
+            // Cannot use `if let Some(.) = .` here because of a borrowchecker
+            // restriction. If the reference is created before the branch is
+            // entered, it will think it needs to be borrowed until the end
+            // of the function (and that will conflict with the mutable
+            // borrow we do for `self.peek.push_back` below).
+            // See also https://github.com/rust-lang/rust/issues/54663.
+            return Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek }));
+        } else {
+            // The target size for the queue is 1, effectively acting as an
+            // Option<T>. In some cases, we need more than one, but that is
+            // always only a temporary burst (e.g. SM resumption
+            // retransmissions), so we release the memory as soon as possible
+            // after that.
+            // Even though the target size is 1, we don't want to be pedantic
+            // about this and we don't want to reallocate often. Some short
+            // bursts are ok, and given that the stanzas inside QueueEntry
+            // elements (the main use case for this type) are boxed anyway,
+            // the size of the elements is rather small.
+            if self.peek.capacity() > 32 {
+                // We do not use shrink_to here, because we are *certain* that
+                // we won't need a larger capacity any time soon, and
+                // allocators may avoid moving data around.
+                let mut new = VecDeque::new();
+                core::mem::swap(&mut self.peek, &mut new);
+            }
+        }
+        match ready!(self.inner.poll_recv(cx)) {
+            None => Poll::Ready(None),
+            Some(v) => {
+                self.peek.push_back(v);
+                Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek }))
+            }
+        }
+    }
+
+    /// Requeue a sequence of items to the front of the queue.
+    ///
+    /// This function preserves ordering of the elements in `iter`, meaning
+    /// that the first item from `iter` is going to be the next item yielded
+    /// by `poll_take` or `poll_peek`.
+    pub fn requeue_all<I: IntoIterator<Item = T>>(&mut self, iter: I) {
+        let iter = iter.into_iter();
+        let to_reserve = iter.size_hint().1.unwrap_or(iter.size_hint().0);
+        self.peek.reserve(to_reserve);
+        let mut n = 0;
+        for item in iter {
+            self.peek.push_front(item);
+            n += 1;
+        }
+        // Now we need to revert the order: we pushed the elements to the
+        // front, so if we now read back from the front via poll_peek or
+        // poll_take, that will cause them to be read in reverse order. The
+        // following loop fixes that.
+        for i in 0..(n / 2) {
+            let j = n - (i + 1);
+            self.peek.swap(i, j);
+        }
+    }
+
+    /// Enqueues an item to be sent after all items in the *local* queue, but
+    /// *before* all items which are still inside the inner `mpsc` channel.
+    pub fn enqueue(&mut self, item: T) {
+        self.peek.push_back(item);
+    }
+
+    /// Return true if the sender side of the queue is closed.
+    ///
+    /// Note that there may still be items which can be retrieved from the
+    /// queue even though it has been closed.
+    pub fn is_closed(&self) -> bool {
+        self.inner.is_closed()
+    }
+}
+
+impl TransmitQueue<QueueEntry> {
+    /// Fail all currently queued items with the given error.
+    ///
+    /// Future items will not be affected.
+    pub fn fail(&mut self, error: &OpaqueIoError) {
+        for item in self.peek.drain(..) {
+            item.token.send_replace(StanzaState::Failed {
+                error: error.clone(),
+            });
+        }
+        while let Ok(item) = self.inner.try_recv() {
+            item.token.send_replace(StanzaState::Failed {
+                error: error.clone(),
+            });
+        }
+        self.peek.shrink_to(1);
+    }
+}

tokio-xmpp/src/stanzastream/stream_management.rs ๐Ÿ”—

@@ -0,0 +1,256 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use core::fmt;
+use std::collections::{vec_deque, VecDeque};
+
+use xmpp_parsers::sm;
+
+use super::queue::{QueueEntry, StanzaState};
+
+#[derive(Debug)]
+pub(super) enum SmResumeInfo {
+    NotResumable,
+    Resumable {
+        /// XEP-0198 stream ID
+        id: String,
+
+        /// Preferred IP and port for resumption as indicated by the peer.
+        // TODO: pass this to the reconnection logic.
+        #[allow(dead_code)]
+        location: Option<String>,
+    },
+}
+
+/// State for stream management
+pub(super) struct SmState {
+    /// Last value seen from the remote stanza counter.
+    outbound_base: u32,
+
+    /// Counter for received stanzas
+    inbound_ctr: u32,
+
+    /// Number of `<sm:a/>` we still need to send.
+    ///
+    /// Acks cannot always be sent right away (if our tx buffer is full), and
+    /// instead of cluttering our outbound queue or something with them, we
+    /// just keep a counter of unsanswered `<sm:r/>`. The stream will process
+    /// these in due time.
+    pub(super) pending_acks: usize,
+
+    ///ย Flag indicating that a `<sm:r/>` request should be sent.
+    pub(super) pending_req: bool,
+
+    /// Information about resumability of the stream
+    resumption: SmResumeInfo,
+
+    /// Unacked stanzas in the order they were sent
+    // We use a VecDeque here because that has better performance
+    // characteristics with the ringbuffer-type usage we're seeing here:
+    // we push stuff to the back, and then drain it from the front. Vec would
+    // have to move all the data around all the time, while VecDeque will just
+    // move some pointers around.
+    unacked_stanzas: VecDeque<QueueEntry>,
+}
+
+impl fmt::Debug for SmState {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_struct("SmState")
+            .field("outbound_base", &self.outbound_base)
+            .field("inbound_ctr", &self.inbound_ctr)
+            .field("resumption", &self.resumption)
+            .field("len(unacked_stanzas)", &self.unacked_stanzas.len())
+            .finish()
+    }
+}
+
+#[derive(Debug)]
+pub(super) enum SmError {
+    RemoteAckedMoreStanzas {
+        local_base: u32,
+        queue_len: u32,
+        remote_ctr: u32,
+    },
+    RemoteAckWentBackwards {
+        local_base: u32,
+        // NOTE: this is not needed to fully specify the error, but it's
+        // needed to generate a `<handled-count-too-high/>` from Self.
+        queue_len: u32,
+        remote_ctr: u32,
+    },
+}
+
+impl From<SmError> for xmpp_parsers::stream_error::StreamError {
+    fn from(other: SmError) -> Self {
+        let (h, send_count) = match other {
+            SmError::RemoteAckedMoreStanzas {
+                local_base,
+                queue_len,
+                remote_ctr,
+            } => (remote_ctr, local_base.wrapping_add(queue_len)),
+            SmError::RemoteAckWentBackwards {
+                local_base,
+                queue_len,
+                remote_ctr,
+            } => (remote_ctr, local_base.wrapping_add(queue_len)),
+        };
+        xmpp_parsers::sm::HandledCountTooHigh { h, send_count }.into()
+    }
+}
+
+impl fmt::Display for SmError {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            Self::RemoteAckedMoreStanzas {
+                local_base,
+                queue_len,
+                remote_ctr,
+            } => {
+                let local_tip = local_base.wrapping_add(*queue_len);
+                write!(f, "remote acked more stanzas than we sent: remote counter = {}. queue covers range {}..<{}", remote_ctr, local_base, local_tip)
+            }
+            Self::RemoteAckWentBackwards {
+                local_base,
+                remote_ctr,
+                ..
+            } => {
+                write!(f, "remote acked less stanzas than before: remote counter = {}, local queue starts at {}", remote_ctr, local_base)
+            }
+        }
+    }
+}
+
+impl SmState {
+    /// Mark a stanza as sent and keep it in the stream management queue.
+    pub fn enqueue(&mut self, entry: QueueEntry) {
+        // This may seem like an arbitrary limit, but there's some thought
+        // in this.
+        // First, the SM counters go up to u32 at most and then wrap around.
+        // That means that any queue size larger than u32 would immediately
+        // cause ambiguities when resuming.
+        // Second, there's RFC 1982 "Serial Number Arithmetic". It is used for
+        // example in DNS for the serial number and it has thoughts on how to
+        // use counters which wrap around at some point. The document proposes
+        // that if the (wrapped) difference between two numbers is larger than
+        // half the number space, you should consider it as a negative
+        // difference.
+        //
+        // Hence the ambiguity already starts at u32::MAX / 2, so we limit the
+        // queue to one less than that.
+        const MAX_QUEUE_SIZE: usize = (u32::MAX / 2 - 1) as usize;
+        if self.unacked_stanzas.len() >= MAX_QUEUE_SIZE {
+            // We don't bother with an error return here. u32::MAX / 2 stanzas
+            // in the queue is fatal in any circumstance I can fathom (also,
+            // we have no way to return this error to the
+            // [`StanzaStream::send`] call anyway).
+            panic!("Too many pending stanzas.");
+        }
+
+        self.unacked_stanzas.push_back(entry);
+        log::trace!(
+            "Stored stanza in SmState. We are now at {} unacked stanzas.",
+            self.unacked_stanzas.len()
+        );
+    }
+
+    /// Process resumption.
+    ///
+    /// Updates the internal state according to the received remote counter.
+    /// Returns an iterator which yields the queue entries which need to be
+    /// retransmitted.
+    pub fn resume(&mut self, h: u32) -> Result<vec_deque::Drain<'_, QueueEntry>, SmError> {
+        self.remote_acked(h)?;
+        // Return the entire leftover queue. We cannot receive acks for them,
+        // unless they are retransmitted, because the peer has not seen them
+        // yet (they got lost in the previous unclean disconnect).
+        Ok(self.unacked_stanzas.drain(..))
+    }
+
+    /// Process remote `<a/>`
+    pub fn remote_acked(&mut self, h: u32) -> Result<(), SmError> {
+        log::debug!("remote_acked: {self:?}::remote_acked({h})");
+        // XEP-0198 specifies that counters are mod 2^32, which is handy when
+        // you use u32 data types :-).
+        let to_drop = h.wrapping_sub(self.outbound_base) as usize;
+        if to_drop > 0 {
+            log::trace!("remote_acked: need to drop {to_drop} stanzas");
+            if to_drop as usize > self.unacked_stanzas.len() {
+                if to_drop as u32 > u32::MAX / 2 {
+                    // If we look at the stanza counter values as RFC 1982
+                    // values, a wrapping difference greater than half the
+                    // number space indicates a negative difference, i.e.
+                    // h went backwards.
+                    return Err(SmError::RemoteAckWentBackwards {
+                        local_base: self.outbound_base,
+                        queue_len: self.unacked_stanzas.len() as u32,
+                        remote_ctr: h,
+                    });
+                } else {
+                    return Err(SmError::RemoteAckedMoreStanzas {
+                        local_base: self.outbound_base,
+                        queue_len: self.unacked_stanzas.len() as u32,
+                        remote_ctr: h,
+                    });
+                }
+            }
+            for entry in self.unacked_stanzas.drain(..to_drop) {
+                entry.token.send_replace(StanzaState::Acked {});
+            }
+            self.outbound_base = h;
+            log::debug!("remote_acked: remote acked {to_drop} stanzas");
+            Ok(())
+        } else {
+            log::trace!("remote_acked: no stanzas to drop");
+            Ok(())
+        }
+    }
+
+    /// Get the current inbound counter.
+    #[inline(always)]
+    pub fn inbound_ctr(&self) -> u32 {
+        self.inbound_ctr
+    }
+
+    /// Get the info necessary for resumption.
+    ///
+    /// Returns the stream ID and the current inbound counter if resumption is
+    /// available and None otherwise.
+    pub fn resume_info(&self) -> Option<(&str, u32)> {
+        match self.resumption {
+            SmResumeInfo::Resumable { ref id, .. } => Some((id, self.inbound_ctr)),
+            SmResumeInfo::NotResumable => None,
+        }
+    }
+}
+
+/// Initialize stream management state
+impl From<sm::Enabled> for SmState {
+    fn from(other: sm::Enabled) -> Self {
+        let resumption = if other.resume {
+            match other.id {
+                Some(id) => SmResumeInfo::Resumable {
+                    location: other.location,
+                    id: id.0,
+                },
+                None => {
+                    log::warn!("peer replied with <enable resume='true'/>, but without an ID! cannot make this stream resumable.");
+                    SmResumeInfo::NotResumable
+                }
+            }
+        } else {
+            SmResumeInfo::NotResumable
+        };
+
+        Self {
+            outbound_base: 0,
+            inbound_ctr: 0,
+            pending_acks: 0,
+            pending_req: false,
+            resumption,
+            unacked_stanzas: VecDeque::new(),
+        }
+    }
+}

tokio-xmpp/src/stanzastream/worker.rs ๐Ÿ”—

@@ -0,0 +1,612 @@
+// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use core::time::Duration;
+use std::io;
+
+use rand::{thread_rng, Rng};
+
+use futures::{ready, SinkExt, StreamExt};
+
+use tokio::{
+    sync::{mpsc, oneshot},
+    time::Instant,
+};
+
+use xmpp_parsers::{
+    iq,
+    jid::Jid,
+    ping,
+    stream_error::{DefinedCondition, StreamError},
+    stream_features::StreamFeatures,
+};
+
+use crate::connect::AsyncReadAndWrite;
+use crate::xmlstream::{ReadError, XmppStreamElement};
+use crate::Stanza;
+
+use super::connected::{ConnectedEvent, ConnectedState};
+use super::negotiation::NegotiationState;
+use super::queue::{QueueEntry, TransmitQueue};
+use super::stream_management::SmState;
+use super::{Event, StreamEvent};
+
+/// Convenience alias for [`XmlStreams`][`crate::xmlstream::XmlStream`] which
+/// may be used with [`StanzaStream`][`super::StanzaStream`].
+pub type XmppStream =
+    crate::xmlstream::XmlStream<Box<dyn AsyncReadAndWrite + Send + 'static>, XmppStreamElement>;
+
+/// Underlying connection for a [`StanzaStream`][`super::StanzaStream`].
+pub struct Connection {
+    /// The stream to use to send and receive XMPP data.
+    pub stream: XmppStream,
+
+    /// The stream features offered by the peer.
+    pub features: StreamFeatures,
+
+    /// The identity to which this stream belongs.
+    ///
+    /// Note that connectors must not return bound streams. However, the Jid
+    /// may still be a full jid in order to request a specific resource at
+    /// bind time. If `identity` is a bare JID, the peer will assign the
+    /// resource.
+    pub identity: Jid,
+}
+
+// Allow for up to 10s for local shutdown.
+// TODO: make this configurable maybe?
+pub(super) static LOCAL_SHUTDOWN_TIMEOUT: Duration = Duration::new(10, 0);
+pub(super) static REMOTE_SHUTDOWN_TIMEOUT: Duration = Duration::new(5, 0);
+pub(super) static PING_PROBE_ID_PREFIX: &str = "xmpp-rs-stanzastream-liveness-probe";
+
+pub(super) enum Never {}
+
+pub(super) enum WorkerEvent {
+    /// The stream was reset and can now be used for rx/tx.
+    Reset {
+        bound_jid: Jid,
+        features: StreamFeatures,
+    },
+
+    /// The stream has been resumed successfully.
+    Resumed,
+
+    /// Data received successfully.
+    Stanza(Stanza),
+
+    /// Failed to parse pieces from the stream.
+    ParseError(xso::error::Error),
+
+    /// Soft timeout noted by the underlying XmppStream.
+    SoftTimeout,
+
+    /// Stream disonnected.
+    Disconnected {
+        /// Slot for a new connection.
+        slot: oneshot::Sender<Connection>,
+
+        /// Set to None if the stream was cleanly closed by the remote side.
+        error: Option<io::Error>,
+    },
+
+    /// The reconnection backend dropped the connection channel.
+    ReconnectAborted,
+}
+
+enum WorkerStream {
+    /// Pending connection.
+    Connecting {
+        /// Optional contents of an [`WorkerEvent::Disconnect`] to emit.
+        notify: Option<(oneshot::Sender<Connection>, Option<io::Error>)>,
+
+        /// Receiver slot for the next connection.
+        slot: oneshot::Receiver<Connection>,
+
+        /// Straem management state from a previous connection.
+        sm_state: Option<SmState>,
+    },
+
+    /// Connection available.
+    Connected {
+        stream: XmppStream,
+        substate: ConnectedState,
+        features: StreamFeatures,
+        identity: Jid,
+    },
+
+    /// Disconnected permanently by local choice.
+    Terminated,
+}
+
+impl WorkerStream {
+    fn disconnect(&mut self, sm_state: Option<SmState>, error: Option<io::Error>) -> WorkerEvent {
+        let (tx, rx) = oneshot::channel();
+        *self = Self::Connecting {
+            notify: None,
+            slot: rx,
+            sm_state,
+        };
+        WorkerEvent::Disconnected { slot: tx, error }
+    }
+
+    fn poll_duplex(
+        self: Pin<&mut Self>,
+        transmit_queue: &mut TransmitQueue<QueueEntry>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<WorkerEvent>> {
+        let this = self.get_mut();
+        loop {
+            match this {
+                // Disconnected cleanly (terminal state), signal end of
+                // stream.
+                Self::Terminated => return Poll::Ready(None),
+
+                // In the progress of reconnecting, wait for reconnection to
+                // complete and then switch states.
+                Self::Connecting {
+                    notify,
+                    slot,
+                    sm_state,
+                } => {
+                    if let Some((slot, error)) = notify.take() {
+                        return Poll::Ready(Some(WorkerEvent::Disconnected { slot, error }));
+                    }
+
+                    match ready!(Pin::new(slot).poll(cx)) {
+                        Ok(Connection {
+                            stream,
+                            features,
+                            identity,
+                        }) => {
+                            let substate = ConnectedState::Negotiating {
+                                // We panic here, but that is ok-ish, because
+                                // that will "only" crash the worker and thus
+                                // the stream, and that is kind of exactly
+                                // what we want.
+                                substate: NegotiationState::new(&features, sm_state.take())
+                                    .expect("Non-negotiable stream"),
+                            };
+                            *this = Self::Connected {
+                                substate,
+                                stream,
+                                features,
+                                identity,
+                            };
+                        }
+                        Err(_) => {
+                            // The sender was dropped. This is fatal.
+                            *this = Self::Terminated;
+                            return Poll::Ready(Some(WorkerEvent::ReconnectAborted));
+                        }
+                    }
+                }
+
+                Self::Connected {
+                    stream,
+                    identity,
+                    substate,
+                    features,
+                } => {
+                    match ready!(substate.poll(
+                        Pin::new(stream),
+                        identity,
+                        &features,
+                        transmit_queue,
+                        cx
+                    )) {
+                        // continue looping if the substate did not produce a result.
+                        None => (),
+
+                        // produced an event to emit.
+                        Some(ConnectedEvent::Worker(v)) => {
+                            match v {
+                                // Capture the JID from a stream reset to
+                                // update our state.
+                                WorkerEvent::Reset { ref bound_jid, .. } => {
+                                    *identity = bound_jid.clone();
+                                }
+                                _ => (),
+                            }
+                            return Poll::Ready(Some(v));
+                        }
+
+                        // stream broke or closed somehow.
+                        Some(ConnectedEvent::Disconnect { sm_state, error }) => {
+                            return Poll::Ready(Some(this.disconnect(sm_state, error)));
+                        }
+
+                        Some(ConnectedEvent::RemoteShutdown { sm_state }) => {
+                            let error = io::Error::new(
+                                io::ErrorKind::ConnectionAborted,
+                                "peer closed the XML stream",
+                            );
+                            let (tx, rx) = oneshot::channel();
+                            let mut new_state = Self::Connecting {
+                                notify: None,
+                                slot: rx,
+                                sm_state,
+                            };
+                            core::mem::swap(this, &mut new_state);
+                            match new_state {
+                                Self::Connected { stream, .. } => {
+                                    tokio::spawn(shutdown_stream_by_remote_choice(
+                                        stream,
+                                        REMOTE_SHUTDOWN_TIMEOUT,
+                                    ));
+                                }
+                                _ => unreachable!(),
+                            }
+
+                            return Poll::Ready(Some(WorkerEvent::Disconnected {
+                                slot: tx,
+                                error: Some(error),
+                            }));
+                        }
+
+                        Some(ConnectedEvent::LocalShutdownRequested) => {
+                            // We don't switch to "terminated" here, but we
+                            // return "end of stream" nontheless.
+                            return Poll::Ready(None);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /// Poll the stream write-only.
+    ///
+    /// This never completes, not even if the `transmit_queue` is empty and
+    /// its sender has been dropped, unless a write error occurs.
+    ///
+    /// The use case behind this is to run his in parallel to a blocking
+    /// operation which should only block the receive side, but not the
+    /// transmit side of the stream.
+    ///
+    /// Calling this and `poll_duplex` from different tasks in parallel will
+    /// cause havoc.
+    ///
+    /// Any errors are reported on the next call to `poll_duplex`.
+    fn poll_writes(
+        &mut self,
+        transmit_queue: &mut TransmitQueue<QueueEntry>,
+        cx: &mut Context,
+    ) -> Poll<Never> {
+        match self {
+            Self::Terminated | Self::Connecting { .. } => Poll::Pending,
+            Self::Connected {
+                substate, stream, ..
+            } => {
+                ready!(substate.poll_writes(Pin::new(stream), transmit_queue, cx));
+                Poll::Pending
+            }
+        }
+    }
+
+    fn start_send_stream_error(&mut self, error: StreamError) {
+        match self {
+            // If we are not connected or still connecting, we feign success
+            // and enter the Terminated state.
+            Self::Terminated | Self::Connecting { .. } => {
+                *self = Self::Terminated;
+            }
+
+            Self::Connected { substate, .. } => substate.start_send_stream_error(error),
+        }
+    }
+
+    fn poll_close(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
+        match self {
+            Self::Terminated => Poll::Ready(Ok(())),
+            Self::Connecting { .. } => {
+                *self = Self::Terminated;
+                Poll::Ready(Ok(()))
+            }
+            Self::Connected {
+                substate, stream, ..
+            } => {
+                let result = ready!(substate.poll_close(Pin::new(stream), cx));
+                *self = Self::Terminated;
+                Poll::Ready(result)
+            }
+        }
+    }
+
+    fn drive_duplex<'a>(
+        &'a mut self,
+        transmit_queue: &'a mut TransmitQueue<QueueEntry>,
+    ) -> DriveDuplex<'a> {
+        DriveDuplex {
+            stream: Pin::new(self),
+            queue: transmit_queue,
+        }
+    }
+
+    fn drive_writes<'a>(
+        &'a mut self,
+        transmit_queue: &'a mut TransmitQueue<QueueEntry>,
+    ) -> DriveWrites<'a> {
+        DriveWrites {
+            stream: Pin::new(self),
+            queue: transmit_queue,
+        }
+    }
+
+    fn close(&mut self) -> Close {
+        Close {
+            stream: Pin::new(self),
+        }
+    }
+
+    /// Enqueue a `<sm:r/>`, if stream management is enabled.
+    ///
+    /// Multiple calls to `send_sm_request` may cause only a single `<sm:r/>`
+    /// to be sent.
+    ///
+    /// Returns true if stream management is enabled and a request could be
+    /// queued or deduplicated with a previous request.
+    fn queue_sm_request(&mut self) -> bool {
+        match self {
+            Self::Terminated | Self::Connecting { .. } => false,
+            Self::Connected { substate, .. } => substate.queue_sm_request(),
+        }
+    }
+}
+
+struct DriveDuplex<'x> {
+    stream: Pin<&'x mut WorkerStream>,
+    queue: &'x mut TransmitQueue<QueueEntry>,
+}
+
+impl<'x> Future for DriveDuplex<'x> {
+    type Output = Option<WorkerEvent>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        let this = self.get_mut();
+        this.stream.as_mut().poll_duplex(this.queue, cx)
+    }
+}
+
+struct DriveWrites<'x> {
+    stream: Pin<&'x mut WorkerStream>,
+    queue: &'x mut TransmitQueue<QueueEntry>,
+}
+
+impl<'x> Future for DriveWrites<'x> {
+    type Output = Never;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        let this = self.get_mut();
+        this.stream.as_mut().poll_writes(this.queue, cx)
+    }
+}
+
+struct Close<'x> {
+    stream: Pin<&'x mut WorkerStream>,
+}
+
+impl<'x> Future for Close<'x> {
+    type Output = io::Result<()>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        let this = self.get_mut();
+        this.stream.as_mut().poll_close(cx)
+    }
+}
+
+pub(super) fn parse_error_to_stream_error(e: xso::error::Error) -> StreamError {
+    use xso::error::Error;
+    let condition = match e {
+        Error::XmlError(_) => DefinedCondition::NotWellFormed,
+        Error::TextParseError(_) | Error::Other(_) => DefinedCondition::InvalidXml,
+        Error::TypeMismatch => DefinedCondition::UnsupportedStanzaType,
+    };
+    StreamError {
+        condition,
+        text: Some((None, e.to_string())),
+        application_specific: vec![],
+    }
+}
+
+/// Worker system for a [`StanzaStream`].
+pub(super) struct StanzaStreamWorker {
+    reconnector: Box<dyn FnMut(Option<String>, oneshot::Sender<Connection>) + Send + 'static>,
+    frontend_tx: mpsc::Sender<Event>,
+    stream: WorkerStream,
+    transmit_queue: TransmitQueue<QueueEntry>,
+}
+
+macro_rules! send_or_break {
+    ($value:expr => $permit:ident in $ch:expr, $txq:expr => $stream:expr$(,)?) => {
+        if let Some(permit) = $permit.take() {
+            log::trace!("stanza received, passing to frontend via permit");
+            permit.send($value);
+        } else {
+            log::trace!("no permit for received stanza available, blocking on channel send while handling writes");
+            tokio::select! {
+                // drive_writes never completes: I/O errors are reported on
+                // the next call to drive_duplex(), which makes it ideal for
+                // use in parallel to $ch.send().
+                result = $stream.drive_writes(&mut $txq) => { match result {} },
+                result = $ch.send($value) => match result {
+                    Err(_) => break,
+                    Ok(()) => (),
+                },
+            }
+        }
+    };
+}
+
+impl StanzaStreamWorker {
+    pub fn spawn(
+        mut reconnector: Box<
+            dyn FnMut(Option<String>, oneshot::Sender<Connection>) + Send + 'static,
+        >,
+        queue_depth: usize,
+    ) -> (mpsc::Sender<QueueEntry>, mpsc::Receiver<Event>) {
+        let (conn_tx, conn_rx) = oneshot::channel();
+        reconnector(None, conn_tx);
+        // c2f = core to frontend
+        let (c2f_tx, c2f_rx) = mpsc::channel(queue_depth);
+        // f2c = frontend to core
+        let (f2c_tx, transmit_queue) = TransmitQueue::channel(queue_depth);
+        let mut worker = StanzaStreamWorker {
+            reconnector,
+            frontend_tx: c2f_tx,
+            stream: WorkerStream::Connecting {
+                slot: conn_rx,
+                sm_state: None,
+                notify: None,
+            },
+            transmit_queue,
+        };
+        tokio::spawn(async move { worker.run().await });
+        (f2c_tx, c2f_rx)
+    }
+
+    pub async fn run(&mut self) {
+        // TODO: consider moving this into SmState somehow, i.e. run a kind
+        // of fake stream management exploiting the sequentiality requirement
+        // from RFC 6120.
+        // NOTE: we use a random starting value here to avoid clashes with
+        // other application code.
+        let mut ping_probe_ctr: u64 = thread_rng().gen();
+
+        // We use mpsc::Sender permits (check the docs on
+        // [`tokio::sync::mpsc::Sender::reserve`]) as a way to avoid blocking
+        // on the `frontend_tx` whenever possible.
+        //
+        // We always try to have a permit available. If we have a permit
+        // available, any event we receive from the stream can be sent to
+        // the frontend tx without blocking. If we do not have a permit
+        // available, the code generated by the send_or_break macro will
+        // use the normal Sender::send coroutine function, but will also
+        // service stream writes in parallel (putting backpressure on the
+        // sender while not blocking writes on our end).
+        let mut permit = None;
+        loop {
+            tokio::select! {
+                new_permit = self.frontend_tx.reserve(), if permit.is_none() && !self.frontend_tx.is_closed() => match new_permit {
+                    Ok(new_permit) => permit = Some(new_permit),
+                    // Receiver side droppedโ€ฆ That is stream closure, so we
+                    // shut everything down and exit.
+                    Err(_) => break,
+                },
+                ev = self.stream.drive_duplex(&mut self.transmit_queue) => {
+                    let Some(ev) = ev else {
+                        // Stream terminated by local choice. Exit.
+                        break;
+                    };
+                    match ev {
+                        WorkerEvent::Reset { bound_jid, features } => send_or_break!(
+                            Event::Stream(StreamEvent::Reset { bound_jid, features }) => permit in self.frontend_tx,
+                            self.transmit_queue => self.stream,
+                        ),
+                        WorkerEvent::Disconnected { slot, error } => {
+                            send_or_break!(
+                                Event::Stream(StreamEvent::Suspended) => permit in self.frontend_tx,
+                                self.transmit_queue => self.stream,
+                            );
+                            if let Some(error) = error {
+                                log::debug!("Backend stream got disconnected because of an I/O error: {error}. Attempting reconnect.");
+                            } else {
+                                log::debug!("Backend stream got disconnected for an unknown reason. Attempting reconnect.");
+                            }
+                            if self.frontend_tx.is_closed() || self.transmit_queue.is_closed() {
+                                log::debug!("Immediately aborting reconnect because the frontend is gone.");
+                                break;
+                            }
+                            (self.reconnector)(None, slot);
+                        }
+                        WorkerEvent::Resumed => send_or_break!(
+                            Event::Stream(StreamEvent::Resumed) => permit in self.frontend_tx,
+                            self.transmit_queue => self.stream,
+                        ),
+                        WorkerEvent::Stanza(stanza) => send_or_break!(
+                            Event::Stanza(stanza) => permit in self.frontend_tx,
+                            self.transmit_queue => self.stream,
+                        ),
+                        WorkerEvent::ParseError(e) => {
+                            log::error!("Parse error on stream: {e}");
+                            self.stream.start_send_stream_error(parse_error_to_stream_error(e));
+                            // We are not break-ing here, because drive_duplex
+                            // is sending the error.
+                        }
+                        WorkerEvent::SoftTimeout => {
+                            if self.stream.queue_sm_request() {
+                                log::debug!("SoftTimeout tripped: enqueued <sm:r/>");
+                            } else {
+                                log::debug!("SoftTimeout tripped. Stream Management is not enabled, enqueueing ping IQ");
+                                ping_probe_ctr = ping_probe_ctr.wrapping_add(1);
+                                // We can leave to/from blank because those
+                                // are not needed to send a ping to the peer.
+                                // (At least that holds true on c2s streams.
+                                // On s2s, things are more complicated anyway
+                                // due to how bidi works.)
+                                self.transmit_queue.enqueue(QueueEntry::untracked(Box::new(iq::Iq::from_get(
+                                    format!("{}-{}", PING_PROBE_ID_PREFIX, ping_probe_ctr),
+                                    ping::Ping,
+                                ).into())));
+                            }
+                        }
+                        WorkerEvent::ReconnectAborted => {
+                            panic!("Backend was unable to handle reconnect request.");
+                        }
+                    }
+                },
+            }
+        }
+        match self.stream.close().await {
+            Ok(()) => log::debug!("Stream closed successfully"),
+            Err(e) => log::debug!("Stream closure failed: {e}"),
+        }
+    }
+}
+
+async fn shutdown_stream_by_remote_choice(mut stream: XmppStream, timeout: Duration) {
+    let deadline = Instant::now() + timeout;
+    match tokio::time::timeout_at(
+        deadline,
+        <XmppStream as SinkExt<&Stanza>>::close(&mut stream),
+    )
+    .await
+    {
+        // We don't really care about success or failure here.
+        Ok(_) => (),
+        // .. but if we run in a timeout, we exit here right away.
+        Err(_) => {
+            log::debug!("Giving up on clean stream shutdown after timeout elapsed.");
+            return;
+        }
+    }
+    let timeout = tokio::time::sleep_until(deadline);
+    tokio::pin!(timeout);
+    loop {
+        tokio::select! {
+            _ = &mut timeout => {
+                log::debug!("Giving up on clean stream shutdown after timeout elapsed.");
+                break;
+            }
+            ev = stream.next() => match ev {
+                None => break,
+                Some(Ok(data)) => {
+                    log::debug!("Ignoring data on stream during shutdown: {data:?}");
+                    break;
+                }
+                Some(Err(ReadError::HardError(e))) => {
+                    log::debug!("Ignoring stream I/O error during shutdown: {e}");
+                    break;
+                }
+                Some(Err(ReadError::SoftTimeout)) => (),
+                Some(Err(ReadError::ParseError(_))) => (),
+                Some(Err(ReadError::StreamFooterReceived)) => (),
+            }
+        }
+    }
+}

tokio-xmpp/src/xmlstream/mod.rs ๐Ÿ”—

@@ -7,7 +7,8 @@
 //! # RFC 6120 XML Streams
 //!
 //! **Note:** The XML stream is a low-level API which you should probably not
-//! use directly.
+//! use directly. You may be looking for
+//! [`StanzaStream`][`crate::stanzastream::StanzaStream`] instead.
 //!
 //! Establishing an XML stream is always a multi-stage process due to how
 //! stream negotiation works. Based on the values sent by the initiator in the
@@ -239,6 +240,10 @@ pin_project_lite::pin_project! {
     /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the
     /// payload consists of items of type `T` implementing [`FromXml`] and
     /// [`AsXml`].
+    ///
+    /// **Note:** The XML stream is a low-level API which you should probably
+    /// not use directly. You may be looking for
+    /// [`StanzaStream`][`crate::stanzastream::StnazaStream`] instead.
     pub struct XmlStream<Io, T: FromXml> {
         #[pin]
         inner: RawXmlStream<Io>,

tokio-xmpp/src/xmlstream/xmpp.rs ๐Ÿ”—

@@ -6,7 +6,7 @@
 
 use xso::{AsXml, FromXml};
 
-use xmpp_parsers::{component, sasl, starttls, stream_error::ReceivedStreamError};
+use xmpp_parsers::{component, sasl, sm, starttls, stream_error::ReceivedStreamError};
 
 use crate::Stanza;
 
@@ -33,4 +33,8 @@ pub enum XmppStreamElement {
     /// Stream error received
     #[xml(transparent)]
     StreamError(ReceivedStreamError),
+
+    /// XEP-0198 nonzas
+    #[xml(transparent)]
+    SM(sm::Nonza),
 }