tokio_xmpp: introduce xmlstream module

Jonas Schäfer created

This module provides XSO-based parsing, proper typestates and will
soon replace the proto module.

Change summary

tokio-xmpp/Cargo.toml                 |   2 
tokio-xmpp/ChangeLog                  |   4 
tokio-xmpp/src/lib.rs                 |   1 
tokio-xmpp/src/xmlstream/common.rs    | 591 +++++++++++++++++++++++++++++
tokio-xmpp/src/xmlstream/initiator.rs |  63 +++
tokio-xmpp/src/xmlstream/mod.rs       | 330 ++++++++++++++++
tokio-xmpp/src/xmlstream/responder.rs |  98 ++++
tokio-xmpp/src/xmlstream/tests.rs     | 264 ++++++++++++
8 files changed, 1,351 insertions(+), 2 deletions(-)

Detailed changes

tokio-xmpp/Cargo.toml đź”—

@@ -23,10 +23,12 @@ rustls-native-certs = { version = "0.7", optional = true }
 rxml = { version = "0.12.0", features = ["compact_str"] }
 rand = "0.8"
 syntect = { version = "5", optional = true }
+pin-project-lite = { version = "0.2" }
 # same repository dependencies
 sasl = { version = "0.5", path = "../sasl" }
 xmpp-parsers = { version = "0.21", path = "../parsers" }
 minidom = { version = "0.16", path = "../minidom" }
+xso = { version = "0.1", path = "../xso" }
 
 # these are only needed for starttls ServerConnector support
 hickory-resolver = { version = "0.24", optional = true}

tokio-xmpp/ChangeLog đź”—

@@ -15,8 +15,6 @@ XXXX-YY-ZZ RELEASER <admin@example.com>
       - `AsyncClient::poll_next` properly closes stream with `Poll::Ready(None)` when disconnecting without auto reconnect (!436)
       - remove `tokio_xmpp::SimpleClient` because it was not widely used, and not well documented ; if you need it,
         please let us know and it will be reintegrated (!428)
-      - `XMPPStream` was renamed `XmppStream` and is now published as `tokio_xmpp::proto::XmppStream` (!428)
-      - `XmppCodec` was moved to proto module and is now published as `tokio_xmpp::proto::XmppCodec` (!428)
       - `Component::new` and `Client::new only require jid/password argument (!428)
       - `ServerConfig` and `Client::new_with_config` have been removed (!428)
       - ``Client` now has `new_plaintext`, `new_starttls` and `new_with_connector` method (!428)
@@ -25,6 +23,8 @@ XXXX-YY-ZZ RELEASER <admin@example.com>
       - `Component` now has `new_plaintext` and `new_with_connector` constructors, just like `Client` but without StartTLS (!428)
       - `tokio_xmpp::AsyncClient` has been renamed `tokio_xmpp::Client` (!428)
       - `Component` is now gated behind `insecure-tcp` feature flag
+      - `XMPPStream` and `XmppCodec` were removed in favour of the newly
+        implemented `tokio_xmpp::xmlstream module.
 
 Version 4.0.0:
 2024-07-26 Maxime “pep” Buquet <pep@bouah.net>

tokio-xmpp/src/lib.rs đź”—

@@ -50,6 +50,7 @@ mod event;
 pub use event::Event;
 pub mod connect;
 pub mod proto;
+pub mod xmlstream;
 
 mod client;
 pub use client::Client;

tokio-xmpp/src/xmlstream/common.rs đź”—

@@ -0,0 +1,591 @@
+// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use std::borrow::Cow;
+use std::io;
+
+use futures::{ready, Sink, SinkExt, Stream, StreamExt};
+
+use bytes::{Buf, BytesMut};
+
+use tokio::io::{AsyncBufRead, AsyncWrite};
+
+use xso::{
+    exports::rxml::{self, writer::TrackNamespace, xml_ncname, Event, Namespace},
+    FromEventsBuilder, FromXml, Item,
+};
+
+use xmpp_parsers::ns::STREAM as XML_STREAM_NS;
+
+pin_project_lite::pin_project! {
+    // NOTE: due to limitations of pin_project_lite, the field comments are
+    // no doc comments. Luckily, this struct is only `pub(super)` anyway.
+    #[project = RawXmlStreamProj]
+    pub(super) struct RawXmlStream<Io> {
+        // The parser used for deserialising data.
+        #[pin]
+        parser: rxml::AsyncReader<Io>,
+
+        // The writer used for serialising data.
+        writer: rxml::writer::Encoder<rxml::writer::SimpleNamespaces>,
+
+        // The default namespace to declare on the stream header.
+        stream_ns: &'static str,
+
+        // Buffer containing serialised data which will then be sent through
+        // the inner `Io`. Sending that serialised data happens in
+        // `poll_ready` and `poll_flush`, while appending serialised data
+        // happens in `start_send`.
+        tx_buffer: BytesMut,
+
+        // This signifies the limit at the point of which the Sink will
+        // refuse to accept more data: if the `tx_buffer`'s size grows beyond
+        // that high water mark, poll_ready will return Poll::Pending until
+        // it has managed to flush enough data down the inner writer.
+        //
+        // Note that poll_ready will always attempt to progress the writes,
+        // which further reduces the chance of hitting this limit unless
+        // either the underlying writer gets stuck (e.g. TCP connection
+        // breaking in a timeouty way) or a lot of data is written in bulk.
+        // In both cases, the backpressure created by poll_ready returning
+        // Pending is desirable.
+        //
+        // However, there is a catch: We don't assert this condition
+        // in `start_send` at all. The reason is that we cannot suspend
+        // serialisation of an XSO in the middle of writing it: it has to be
+        // written in one batch or you have to start over later (this has to
+        // do with the iterator state borrowing the data and futures getting
+        // cancelled e.g. in tokio::select!). In order to facilitate
+        // implementing a `Sink<T: AsXml>` on top of `RawXmlStream`, we
+        // cannot be strict about what is going on in `start_send`:
+        // `poll_ready` does not know what kind of data will be written (so
+        // it could not make a size estimate, even if that was at all
+        // possible with AsXml) and `start_send` is not a coroutine. So if
+        // `Sink<T: AsXml>` wants to use `RawXmlStream`, it must be able to
+        // submit an entire XSO's items in one batch to `RawXmlStream` after
+        // it has reported to be ready once. That may easily make the buffer
+        // reach its high water mark.
+        //
+        // So if we checked that condition in `start_send` (as opposed to
+        // `poll_ready`), we would cause situations where submitting XSOs
+        // failed randomly (with a panic or other errors) and would have to
+        // be retried later.
+        //
+        // While failing with e.g. io::ErrorKind::WouldBlock is something
+        // that could be investigated later, it would still require being
+        // able to make an accurate estimate of the number of bytes needed to
+        // serialise any given `AsXml`, because as pointed out earlier, once
+        // we have started, there is no going back.
+        //
+        // Finally, none of that hurts much because `RawXmlStream` is only an
+        // internal API. The high-level APIs will always call `poll_ready`
+        // before sending an XSO, which means that we won't *grossly* go over
+        // the TX buffer high water mark---unless you send a really large
+        // XSO at once.
+        tx_buffer_high_water_mark: usize,
+    }
+}
+
+impl<Io: AsyncBufRead + AsyncWrite> RawXmlStream<Io> {
+    fn new_writer(
+        stream_ns: &'static str,
+    ) -> rxml::writer::Encoder<rxml::writer::SimpleNamespaces> {
+        let mut writer = rxml::writer::Encoder::new();
+        writer
+            .ns_tracker_mut()
+            .declare_fixed(Some(xml_ncname!("stream")), XML_STREAM_NS.into());
+        writer
+            .ns_tracker_mut()
+            .declare_fixed(None, stream_ns.into());
+        writer
+    }
+
+    pub(super) fn new(io: Io, stream_ns: &'static str) -> Self {
+        let parser = rxml::Parser::default();
+        Self {
+            parser: rxml::AsyncReader::wrap(io, parser),
+            writer: Self::new_writer(stream_ns),
+            stream_ns,
+            tx_buffer: BytesMut::new(),
+
+            // This basically means: "if we already have 2 kiB in our send
+            // buffer, do not accept more data".
+            // Please see the extensive words at
+            //`Self::tx_buffer_high_water_mark` for details.
+            tx_buffer_high_water_mark: 2048,
+        }
+    }
+
+    pub(super) fn reset_state(self: Pin<&mut Self>) {
+        let this = self.project();
+        *this.parser.parser_pinned() = rxml::Parser::default();
+        *this.writer = Self::new_writer(this.stream_ns);
+    }
+}
+
+impl<Io> RawXmlStream<Io> {
+    fn parser_pinned(self: Pin<&mut Self>) -> &mut rxml::Parser {
+        self.project().parser.parser_pinned()
+    }
+}
+
+impl<Io: AsyncBufRead> Stream for RawXmlStream<Io> {
+    type Item = Result<rxml::Event, io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        loop {
+            return Poll::Ready(
+                match ready!(this.parser.as_mut().poll_read(cx)).transpose() {
+                    // Skip the XML declaration, nobody wants to hear about that.
+                    Some(Ok(rxml::Event::XmlDeclaration(_, _))) => continue,
+                    other => other,
+                },
+            );
+        }
+    }
+}
+
+impl<'x, Io: AsyncWrite> RawXmlStreamProj<'x, Io> {
+    fn progress_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+        while self.tx_buffer.len() > 0 {
+            let written = match ready!(self
+                .parser
+                .as_mut()
+                .inner_pinned()
+                .poll_write(cx, &self.tx_buffer))
+            {
+                Ok(v) => v,
+                Err(e) => return Poll::Ready(Err(e)),
+            };
+            self.tx_buffer.advance(written);
+        }
+        Poll::Ready(Ok(()))
+    }
+}
+
+impl<'x, Io: AsyncWrite> Sink<xso::Item<'x>> for RawXmlStream<Io> {
+    type Error = io::Error;
+
+    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        let mut this = self.project();
+        match this.progress_write(cx) {
+            // No progress on write, but if we have enough space in the buffer
+            // it's ok nonetheless.
+            Poll::Pending => (),
+            // Some progress and it went fine, move on.
+            Poll::Ready(Ok(())) => (),
+            // Something went wrong -> return the error.
+            Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
+        }
+        if this.tx_buffer.len() < *this.tx_buffer_high_water_mark {
+            Poll::Ready(Ok(()))
+        } else {
+            Poll::Pending
+        }
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        let mut this = self.project();
+        ready!(this.progress_write(cx))?;
+        this.parser.as_mut().inner_pinned().poll_flush(cx)
+    }
+
+    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        let mut this = self.project();
+        ready!(this.progress_write(cx))?;
+        this.parser.as_mut().inner_pinned().poll_shutdown(cx)
+    }
+
+    fn start_send(self: Pin<&mut Self>, item: xso::Item<'x>) -> Result<(), Self::Error> {
+        let this = self.project();
+        this.writer
+            .encode_into_bytes(item.as_rxml_item(), this.tx_buffer)
+            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
+    }
+}
+
+/// Error returned by the [`ReadXso`] future and the [`ReadXsoState`] helper.
+pub(super) enum ReadXsoError {
+    /// The outer element was closed before a child element could be read.
+    ///
+    /// This is typically the stream footer in XML stream applications.
+    Footer,
+
+    /// A hard error occurred.
+    ///
+    /// This is either a real I/O error or an error from the XML parser.
+    /// Neither are recoverable, because the nesting state is lost and
+    /// in addition, XML errors are not recoverable because they indicate a
+    /// not well-formed document.
+    Hard(io::Error),
+
+    /// A parse error occurred.
+    ///
+    /// The XML structure was well-formed, but the data contained did not
+    /// match the XSO which was attempted to be parsed. This error is
+    /// recoverable: when this error is emitted, the XML stream is at the same
+    /// nesting level as it was before the XSO was attempted to be read; all
+    /// XML structure which belonged to the XSO which failed to parse has
+    /// been consumed. This allows to read more XSOs even if one fails to
+    /// parse.
+    Parse(xso::error::Error),
+}
+
+impl From<ReadXsoError> for io::Error {
+    fn from(other: ReadXsoError) -> Self {
+        match other {
+            ReadXsoError::Hard(v) => v,
+            ReadXsoError::Parse(e) => io::Error::new(io::ErrorKind::InvalidData, e),
+            ReadXsoError::Footer => io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "element footer while waiting for XSO element start",
+            ),
+        }
+    }
+}
+
+impl From<io::Error> for ReadXsoError {
+    fn from(other: io::Error) -> Self {
+        Self::Hard(other)
+    }
+}
+
+impl From<xso::error::Error> for ReadXsoError {
+    fn from(other: xso::error::Error) -> Self {
+        Self::Parse(other)
+    }
+}
+
+/// State for reading an XSO from a `Stream<Item = Result<rxml::Event, ...>>`.
+///
+/// Due to pinning, it is simpler to implement the statemachine in a dedicated
+/// enum and let the actual (pinned) future pass the stream toward this enum's
+/// function.
+///
+/// This is used by both [`ReadXso`] and the [`super::XmlStream`] itself.
+#[derive(Default)]
+pub(super) enum ReadXsoState<T: FromXml> {
+    /// The [`rxml::Event::StartElement`] event was not seen yet.
+    ///
+    /// In this state, XML whitespace is ignored (as per RFC 6120 § 11.7), but
+    /// other text data is rejected.
+    #[default]
+    PreData,
+
+    /// The [`rxml::Event::StartElement`] event was received.
+    ///
+    /// The inner value is the builder for the "return type" of this enum and
+    /// the implementation in the [`xso`] crate does all the heavy lifting:
+    /// we'll only send events in its general direction.
+    // We use the fallible parsing here so that we don't have to do the depth
+    // accounting ourselves.
+    Parsing(<Result<T, xso::error::Error> as FromXml>::Builder),
+
+    /// The parsing has completed (successful or not).
+    ///
+    /// This is a final state and attempting to advance the state will panic.
+    /// This is in accordance with [`core::future::Future::poll`]'s contract,
+    /// for which this enum is primarily used.
+    Done,
+}
+
+impl<T: FromXml> ReadXsoState<T> {
+    /// Progress reading the XSO from the given source.
+    ///
+    /// This attempts to parse a single XSO from the underlying stream,
+    /// while discarding any XML whitespace before the beginning of the XSO.
+    ///
+    /// If the XSO is parsed successfully, the method returns Ready with the
+    /// parsed value. If parsing fails or an I/O error occurs, an appropriate
+    /// error is returned.
+    ///
+    /// If parsing fails, the entire XML subtree belonging to the XSO is
+    /// nonetheless processed. That makes parse errors recoverable: After
+    /// `poll_advance` has returned Ready with either  an Ok result or a
+    /// [`ReadXsoError::Parse`] error variant, another XSO can be read and the
+    /// XML parsing will be at the same nesting depth as it was before the
+    /// first call to `poll_advance`.
+    ///
+    /// Note that this guarantee does not hold for non-parse errors (i.e. for
+    /// the other variants of [`ReadXsoError`]): I/O errors as well as
+    /// occurrence of the outer closing element are fatal.
+    ///
+    /// The `source` passed to `poll_advance` should be the same on every
+    /// call.
+    pub(super) fn poll_advance<Io: AsyncBufRead>(
+        &mut self,
+        mut source: Pin<&mut RawXmlStream<Io>>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<T, ReadXsoError>> {
+        loop {
+            // Disable text buffering before the start event. That way, we
+            // don't accumulate infinite amounts of XML whitespace caused by
+            // whitespace keepalives.
+            // (And also, we'll know faster when the remote side sends
+            // non-whitespace garbage.)
+            let text_buffering = match self {
+                ReadXsoState::PreData => false,
+                _ => true,
+            };
+            source
+                .as_mut()
+                .parser_pinned()
+                .set_text_buffering(text_buffering);
+            let ev = ready!(source.as_mut().poll_next(cx)).transpose()?;
+            match self {
+                ReadXsoState::PreData => match ev {
+                    Some(rxml::Event::XmlDeclaration(_, _)) => (),
+                    Some(rxml::Event::Text(_, data)) => {
+                        if xso::is_xml_whitespace(data.as_bytes()) {
+                            continue;
+                        } else {
+                            *self = ReadXsoState::Done;
+                            return Poll::Ready(Err(io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                "non-whitespace text content before XSO",
+                            )
+                            .into()));
+                        }
+                    }
+                    Some(rxml::Event::StartElement(_, name, attrs)) => {
+                        *self = ReadXsoState::Parsing(
+                            <Result<T, xso::error::Error> as FromXml>::from_events(name, attrs)
+                                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
+                        );
+                    }
+                    // Amounts to EOF, as we expect to start on the stream level.
+                    Some(rxml::Event::EndElement(_)) => {
+                        *self = ReadXsoState::Done;
+                        return Poll::Ready(Err(ReadXsoError::Footer));
+                    }
+                    None => {
+                        *self = ReadXsoState::Done;
+                        return Poll::Ready(Err(io::Error::new(
+                            io::ErrorKind::UnexpectedEof,
+                            "end of parent element before XSO started",
+                        )
+                        .into()));
+                    }
+                },
+                ReadXsoState::Parsing(builder) => {
+                    let Some(ev) = ev else {
+                        *self = ReadXsoState::Done;
+                        return Poll::Ready(Err(io::Error::new(
+                            io::ErrorKind::UnexpectedEof,
+                            "eof during XSO parsing",
+                        )
+                        .into()));
+                    };
+
+                    match builder.feed(ev) {
+                        Err(err) => {
+                            *self = ReadXsoState::Done;
+                            return Poll::Ready(Err(io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                err,
+                            )
+                            .into()));
+                        }
+                        Ok(Some(Err(err))) => {
+                            *self = ReadXsoState::Done;
+                            return Poll::Ready(Err(ReadXsoError::Parse(err)));
+                        }
+                        Ok(Some(Ok(value))) => {
+                            *self = ReadXsoState::Done;
+                            return Poll::Ready(Ok(value));
+                        }
+                        Ok(None) => (),
+                    }
+                }
+
+                // The error talks about "future", simply because that is
+                // where `Self` is used (inside `core::future::Future::poll`).
+                ReadXsoState::Done => panic!("future polled after completion"),
+            }
+        }
+    }
+}
+
+/// Future to read a single XSO from a stream.
+pub(super) struct ReadXso<'x, Io, T: FromXml> {
+    /// Stream to read the future from.
+    inner: Pin<&'x mut RawXmlStream<Io>>,
+
+    /// Current state of parsing.
+    state: ReadXsoState<T>,
+}
+
+impl<'x, Io: AsyncBufRead, T: FromXml> ReadXso<'x, Io, T> {
+    /// Start reading a single XSO from a stream.
+    pub(super) fn read_from(stream: Pin<&'x mut RawXmlStream<Io>>) -> Self {
+        Self {
+            inner: stream,
+            state: ReadXsoState::PreData,
+        }
+    }
+}
+
+impl<'x, Io: AsyncBufRead, T: FromXml> Future for ReadXso<'x, Io, T>
+where
+    T::Builder: Unpin,
+{
+    type Output = Result<T, ReadXsoError>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let this = self.get_mut();
+        this.state.poll_advance(this.inner.as_mut(), cx)
+    }
+}
+
+/// Contains metadata from an XML stream header
+#[derive(Default)]
+pub struct StreamHeader<'x> {
+    /// The optional `from` attribute.
+    pub from: Option<Cow<'x, str>>,
+
+    /// The optional `to` attribute.
+    pub to: Option<Cow<'x, str>>,
+
+    /// The optional `id` attribute.
+    pub id: Option<Cow<'x, str>>,
+}
+
+impl<'x> StreamHeader<'x> {
+    /// Take the contents and return them as new object.
+    ///
+    /// `self` will be left with all its parts set to `None`.
+    pub fn take(&mut self) -> Self {
+        Self {
+            from: self.from.take(),
+            to: self.to.take(),
+            id: self.id.take(),
+        }
+    }
+
+    pub(super) async fn send<Io: AsyncWrite>(
+        self,
+        mut stream: Pin<&mut RawXmlStream<Io>>,
+    ) -> io::Result<()> {
+        stream
+            .send(Item::XmlDeclaration(rxml::XmlVersion::V1_0))
+            .await?;
+        stream
+            .send(Item::ElementHeadStart(
+                Namespace::from(XML_STREAM_NS),
+                Cow::Borrowed(xml_ncname!("stream")),
+            ))
+            .await?;
+        if let Some(from) = self.from {
+            stream
+                .send(Item::Attribute(
+                    Namespace::NONE,
+                    Cow::Borrowed(xml_ncname!("from")),
+                    from,
+                ))
+                .await?;
+        }
+        if let Some(to) = self.to {
+            stream
+                .send(Item::Attribute(
+                    Namespace::NONE,
+                    Cow::Borrowed(xml_ncname!("to")),
+                    to,
+                ))
+                .await?;
+        }
+        if let Some(id) = self.id {
+            stream
+                .send(Item::Attribute(
+                    Namespace::NONE,
+                    Cow::Borrowed(xml_ncname!("id")),
+                    id,
+                ))
+                .await?;
+        }
+        stream
+            .send(Item::Attribute(
+                Namespace::NONE,
+                Cow::Borrowed(xml_ncname!("version")),
+                Cow::Borrowed("1.0"),
+            ))
+            .await?;
+        stream.send(Item::ElementHeadEnd).await?;
+        Ok(())
+    }
+}
+
+impl StreamHeader<'static> {
+    pub(super) async fn recv<Io: AsyncBufRead>(
+        mut stream: Pin<&mut RawXmlStream<Io>>,
+    ) -> io::Result<Self> {
+        loop {
+            match stream.as_mut().next().await.transpose()? {
+                Some(Event::StartElement(_, (ns, name), mut attrs)) => {
+                    if ns != XML_STREAM_NS || name != "stream" {
+                        return Err(io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            "unknown stream header",
+                        ));
+                    }
+
+                    match attrs.remove(Namespace::none(), "version") {
+                        Some(v) => {
+                            if v != "1.0" {
+                                return Err(io::Error::new(
+                                    io::ErrorKind::InvalidData,
+                                    format!("unsuppored stream version: {}", v),
+                                ));
+                            }
+                        }
+                        None => {
+                            return Err(io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                "required `version` attribute missing",
+                            ))
+                        }
+                    }
+
+                    let from = attrs.remove(Namespace::none(), "from");
+                    let to = attrs.remove(Namespace::none(), "to");
+                    let id = attrs.remove(Namespace::none(), "id");
+                    let _ = attrs.remove(Namespace::xml(), "lang");
+
+                    if let Some(((ns, name), _)) = attrs.into_iter().next() {
+                        return Err(io::Error::new(
+                            io::ErrorKind::InvalidData,
+                            format!("unexpected stream header attribute: {{{}}}{}", ns, name),
+                        ));
+                    }
+
+                    return Ok(StreamHeader {
+                        from: from.map(Cow::Owned),
+                        to: to.map(Cow::Owned),
+                        id: id.map(Cow::Owned),
+                    });
+                }
+                Some(Event::Text(_, _)) | Some(Event::EndElement(_)) => {
+                    return Err(io::Error::new(
+                        io::ErrorKind::UnexpectedEof,
+                        "unexpected content before stream header",
+                    ))
+                }
+                // We cannot loop infinitely here because the XML parser will
+                // prevent more than one XML declaration from being parsed.
+                Some(Event::XmlDeclaration(_, _)) => (),
+                None => {
+                    return Err(io::Error::new(
+                        io::ErrorKind::UnexpectedEof,
+                        "eof before stream header",
+                    ))
+                }
+            }
+        }
+    }
+}

tokio-xmpp/src/xmlstream/initiator.rs đź”—

@@ -0,0 +1,63 @@
+// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use core::pin::Pin;
+use std::borrow::Cow;
+use std::io;
+
+use tokio::io::{AsyncBufRead, AsyncWrite};
+
+use xmpp_parsers::stream_features::StreamFeatures;
+
+use xso::{AsXml, FromXml};
+
+use super::{
+    common::{RawXmlStream, ReadXso, StreamHeader},
+    XmlStream,
+};
+
+/// Type state for an initiator stream which has sent and received the stream
+/// header.
+///
+/// To continue stream setup, call [`recv_features`][`Self::recv_features`].
+pub struct PendingFeaturesRecv<Io> {
+    pub(super) stream: RawXmlStream<Io>,
+    pub(super) header: StreamHeader<'static>,
+}
+
+impl<Io> PendingFeaturesRecv<Io> {
+    /// The stream header contents as sent by the peer.
+    pub fn header(&self) -> StreamHeader<'_> {
+        StreamHeader {
+            from: self.header.from.as_ref().map(|x| Cow::Borrowed(&**x)),
+            to: self.header.to.as_ref().map(|x| Cow::Borrowed(&**x)),
+            id: self.header.id.as_ref().map(|x| Cow::Borrowed(&**x)),
+        }
+    }
+
+    /// Extract the stream header contents as sent by the peer.
+    pub fn take_header(&mut self) -> StreamHeader<'static> {
+        self.header.take()
+    }
+}
+
+impl<Io: AsyncBufRead + AsyncWrite + Unpin> PendingFeaturesRecv<Io> {
+    /// Receive the responder's stream features.
+    ///
+    /// After the stream features have been received, the stream can be used
+    /// for exchanging stream-level elements (stanzas or "nonzas"). The Rust
+    /// type for these elements must be given as type parameter `T`.
+    pub async fn recv_features<T: FromXml + AsXml>(
+        self,
+    ) -> io::Result<(StreamFeatures, XmlStream<Io, T>)> {
+        let Self {
+            mut stream,
+            header: _,
+        } = self;
+        let features = ReadXso::read_from(Pin::new(&mut stream)).await?;
+        Ok((features, XmlStream::wrap(stream)))
+    }
+}

tokio-xmpp/src/xmlstream/mod.rs đź”—

@@ -0,0 +1,330 @@
+// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+//! # RFC 6120 XML Streams
+//!
+//! **Note:** The XML stream is a low-level API which you should probably not
+//! use directly.
+//!
+//! Establishing an XML stream is always a multi-stage process due to how
+//! stream negotiation works. Based on the values sent by the initiator in the
+//! stream header, the responder may choose to offer different features.
+//!
+//! In order to allow this, the following multi-step processes are defined.
+//!
+//! ## Initiating an XML stream
+//!
+//! To initiate an XML stream, you need to:
+//!
+//! 1. Call [`initiate_stream`] to obtain the [`PendingFeaturesRecv`] object.
+//!    That object holds the stream header sent by the peer for inspection.
+//! 2. Call [`PendingFeaturesRecv::recv_features`] if you are content with
+//!    the content of the stream header to obtain the [`XmlStream`] object and
+//!    the features sent by the peer.
+//!
+//! ## Accepting an XML stream connection
+//!
+//! To accept an XML stream, you need to:
+//!
+//! 1. Call [`accept_stream`] to obtain the [`AcceptedStream`] object.
+//!    That object holds the stream header sent by the peer for inspection.
+//! 2. Call [`AcceptedStream::send_header`] if you are content with
+//!    the content of the stream header to obtain the [`PendingFeaturesSend`]
+//!    object.
+//! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features
+//!    to the peer and obtain the [`XmlStream`] object.
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use std::io;
+
+use futures::{ready, Sink, SinkExt, Stream};
+
+use tokio::io::{AsyncBufRead, AsyncWrite};
+
+use xso::{AsXml, FromXml, Item};
+
+mod common;
+mod initiator;
+mod responder;
+#[cfg(test)]
+mod tests;
+
+use self::common::{RawXmlStream, ReadXsoError, ReadXsoState, StreamHeader};
+pub use self::initiator::PendingFeaturesRecv;
+pub use self::responder::{AcceptedStream, PendingFeaturesSend};
+
+/// Initiate a new stream
+///
+/// Initiate a new stream using the given I/O object `io`. The default
+/// XML namespace will be set to `stream_ns` and the stream header will use
+/// the attributes as set in `stream_header`, along with version `1.0`.
+///
+/// The returned object contains the stream header sent by the remote side
+/// as well as the internal parser state to continue the negotiation.
+pub async fn initiate_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
+    io: Io,
+    stream_ns: &'static str,
+    stream_header: StreamHeader<'_>,
+) -> Result<PendingFeaturesRecv<Io>, io::Error> {
+    let mut raw_stream = RawXmlStream::new(io, stream_ns);
+    stream_header.send(Pin::new(&mut raw_stream)).await?;
+    raw_stream.flush().await?;
+
+    let header = StreamHeader::recv(Pin::new(&mut raw_stream)).await?;
+
+    Ok(PendingFeaturesRecv {
+        stream: raw_stream,
+        header,
+    })
+}
+
+/// Accept a new XML stream as responder
+///
+/// Prepares the responer side of an XML stream using the given I/O object
+/// `io`. The default XML namespace will be set to `stream_ns`.
+///
+/// The returned object contains the stream header sent by the remote side
+/// as well as the internal parser state to continue the negotiation.
+pub async fn accept_stream<Io: AsyncBufRead + AsyncWrite + Unpin>(
+    io: Io,
+    stream_ns: &'static str,
+) -> Result<AcceptedStream<Io>, io::Error> {
+    let mut stream = RawXmlStream::new(io, stream_ns);
+    let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
+    Ok(AcceptedStream { stream, header })
+}
+
+/// A non-success state which may occur while reading an XSO from a
+/// [`XmlStream`]
+#[derive(Debug)]
+pub enum ReadError {
+    /// The soft timeout of the stream triggered.
+    ///
+    /// User code should handle this by sending something into the stream
+    /// which causes the peer to send data before the hard timeout triggers.
+    SoftTimeout,
+
+    /// An I/O error occurred in the underlying I/O object.
+    ///
+    /// This is generally fatal.
+    HardError(io::Error),
+
+    /// A parse error occurred while processing the XSO.
+    ///
+    /// This is non-fatal and more XSOs may be read from the stream.
+    ParseError(xso::error::Error),
+
+    /// The stream footer was received.
+    ///
+    /// Any future read attempts will again return this error. The stream has
+    /// been closed by the peer and you should probably close it, too.
+    StreamFooterReceived,
+}
+
+enum WriteState {
+    Open,
+    SendElementFoot,
+    FooterSent,
+    Failed,
+}
+
+impl WriteState {
+    fn check_ok(&self) -> io::Result<()> {
+        match self {
+            WriteState::Failed => Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "XML stream sink unusable because of previous write error",
+            )),
+            WriteState::Open | WriteState::SendElementFoot | WriteState::FooterSent => Ok(()),
+        }
+    }
+
+    fn check_writable(&self) -> io::Result<()> {
+        match self {
+            WriteState::SendElementFoot | WriteState::FooterSent => Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "stream footer already sent",
+            )),
+            WriteState::Failed | WriteState::Open => self.check_ok(),
+        }
+    }
+}
+
+pin_project_lite::pin_project! {
+    /// XML stream
+    ///
+    /// This struct represents an
+    /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the
+    /// payload consists of items of type `T` implementing [`FromXml`] and
+    /// [`AsXml`].
+    pub struct XmlStream<Io, T: FromXml> {
+        #[pin]
+        inner: RawXmlStream<Io>,
+        read_state: Option<ReadXsoState<T>>,
+        write_state: WriteState,
+    }
+}
+
+impl<Io: AsyncBufRead, T: FromXml + AsXml> XmlStream<Io, T> {
+    fn wrap(inner: RawXmlStream<Io>) -> Self {
+        Self {
+            inner,
+            read_state: Some(ReadXsoState::default()),
+            write_state: WriteState::Open,
+        }
+    }
+
+    fn assert_retypable(&self) {
+        match self.read_state {
+            Some(ReadXsoState::PreData) => (),
+            Some(_) => panic!("cannot reset stream: XSO parsing in progress!"),
+            None => panic!("cannot reset stream: stream footer received!"),
+        }
+        match self.write_state.check_writable() {
+            Ok(()) => (),
+            Err(e) => panic!("cannot reset stream: {}", e),
+        }
+    }
+}
+
+impl<Io: AsyncBufRead + AsyncWrite + Unpin, T: FromXml + AsXml> XmlStream<Io, T> {
+    /// Initiate a stream reset
+    ///
+    /// The `header` is the new stream header which is sent to the remote
+    /// party.
+    ///
+    /// # Panics
+    ///
+    /// Attempting to reset the stream while an object is being received will
+    /// panic. This can generally only happen if you call `poll_next`
+    /// directly, as doing that is otherwise prevented by the borrowchecker.
+    ///
+    /// In addition, attempting to reset a stream which has been closed by
+    /// either side or which has had an I/O error will also cause a panic.
+    pub async fn initiate_reset(
+        self,
+        header: StreamHeader<'_>,
+    ) -> io::Result<PendingFeaturesRecv<Io>> {
+        self.assert_retypable();
+
+        let mut stream = self.inner;
+        Pin::new(&mut stream).reset_state();
+        header.send(Pin::new(&mut stream)).await?;
+        stream.flush().await?;
+        let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
+        Ok(PendingFeaturesRecv { stream, header })
+    }
+
+    /// Anticipate a new stream header sent by the remote party.
+    ///
+    /// This is the responder-side counterpart to
+    /// [`initiate_reset`][`Self::initiate_reset`].
+    ///
+    /// # Panics
+    ///
+    /// Attempting to reset the stream while an object is being received will
+    /// panic. This can generally only happen if you call `poll_next`
+    /// directly, as doing that is otherwise prevented by the borrowchecker.
+    ///
+    /// In addition, attempting to reset a stream which has been closed by
+    /// either side or which has had an I/O error will also cause a panic.
+    pub async fn accept_reset(self) -> io::Result<AcceptedStream<Io>> {
+        self.assert_retypable();
+
+        let mut stream = self.inner;
+        Pin::new(&mut stream).reset_state();
+        let header = StreamHeader::recv(Pin::new(&mut stream)).await?;
+        Ok(AcceptedStream { stream, header })
+    }
+}
+
+impl<Io: AsyncBufRead, T: FromXml + AsXml> Stream for XmlStream<Io, T> {
+    type Item = Result<T, ReadError>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let this = self.project();
+        let result = match this.read_state.as_mut() {
+            None => return Poll::Ready(Some(Err(ReadError::StreamFooterReceived))),
+            Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)),
+        };
+        let result = match result {
+            Ok(v) => Poll::Ready(Some(Ok(v))),
+            Err(ReadXsoError::Hard(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))),
+            Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))),
+            Err(ReadXsoError::Footer) => {
+                *this.read_state = None;
+                Poll::Ready(Some(Err(ReadError::StreamFooterReceived)))
+            }
+        };
+        *this.read_state = Some(ReadXsoState::default());
+        result
+    }
+}
+
+impl<'x, Io: AsyncWrite, T: FromXml + AsXml> Sink<&'x T> for XmlStream<Io, T> {
+    type Error = io::Error;
+
+    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        let this = self.project();
+        this.write_state.check_writable()?;
+        this.inner.poll_ready(cx)
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        let this = self.project();
+        this.write_state.check_writable()?;
+        this.inner.poll_flush(cx)
+    }
+
+    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        let mut this = self.project();
+        this.write_state.check_ok()?;
+        loop {
+            match this.write_state {
+                // Open => initiate closing.
+                WriteState::Open => {
+                    *this.write_state = WriteState::SendElementFoot;
+                }
+                // Sending => wait for readiness, then send.
+                WriteState::SendElementFoot => {
+                    match ready!(this.inner.as_mut().poll_ready(cx))
+                        .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot))
+                    {
+                        Ok(()) => (),
+                        // If it fails, we fail the sink immediately.
+                        Err(e) => {
+                            *this.write_state = WriteState::Failed;
+                            return Poll::Ready(Err(e));
+                        }
+                    }
+                    *this.write_state = WriteState::FooterSent;
+                }
+                // Footer sent => just poll the inner sink for closure.
+                WriteState::FooterSent => break,
+                WriteState::Failed => unreachable!(), // caught by check_ok()
+            }
+        }
+        this.inner.poll_close(cx)
+    }
+
+    fn start_send(self: Pin<&mut Self>, item: &'x T) -> Result<(), Self::Error> {
+        let mut this = self.project();
+        this.write_state.check_writable()?;
+        let iter = match item.as_xml_iter() {
+            Ok(v) => v,
+            Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
+        };
+        for item in iter {
+            let item = match item {
+                Ok(v) => v,
+                Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
+            };
+            this.inner.as_mut().start_send(item)?;
+        }
+        Ok(())
+    }
+}

tokio-xmpp/src/xmlstream/responder.rs đź”—

@@ -0,0 +1,98 @@
+// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use core::pin::Pin;
+use std::borrow::Cow;
+use std::io;
+
+use futures::SinkExt;
+
+use tokio::io::{AsyncBufRead, AsyncWrite};
+
+use xmpp_parsers::stream_features::StreamFeatures;
+
+use xso::{AsXml, FromXml};
+
+use super::{
+    common::{RawXmlStream, StreamHeader},
+    XmlStream,
+};
+
+/// Type state for a responder stream which has received a stream header
+///
+/// To continue stream setup, call [`send_header`][`Self::send_header`].
+pub struct AcceptedStream<Io> {
+    pub(super) stream: RawXmlStream<Io>,
+    pub(super) header: StreamHeader<'static>,
+}
+
+impl<Io> AcceptedStream<Io> {
+    /// The stream header contents as sent by the peer.
+    pub fn header(&self) -> StreamHeader<'_> {
+        StreamHeader {
+            from: self.header.from.as_ref().map(|x| Cow::Borrowed(&**x)),
+            to: self.header.to.as_ref().map(|x| Cow::Borrowed(&**x)),
+            id: self.header.id.as_ref().map(|x| Cow::Borrowed(&**x)),
+        }
+    }
+
+    /// Extract the stream header contents as sent by the peer.
+    pub fn take_header(&mut self) -> StreamHeader<'static> {
+        self.header.take()
+    }
+}
+
+impl<Io: AsyncBufRead + AsyncWrite + Unpin> AcceptedStream<Io> {
+    /// Send a stream header.
+    ///
+    /// Sends the given stream header to the initiator. Returns a new object
+    /// which is prepared to send the stream features.
+    pub async fn send_header(
+        self,
+        header: StreamHeader<'_>,
+    ) -> io::Result<PendingFeaturesSend<Io>> {
+        let Self {
+            mut stream,
+            header: _,
+        } = self;
+
+        header.send(Pin::new(&mut stream)).await?;
+        Ok(PendingFeaturesSend { stream })
+    }
+}
+
+/// Type state for a responder stream which has received and sent the stream
+/// header.
+///
+/// To continue stream setup, call [`send_features`][`Self::send_features`].
+pub struct PendingFeaturesSend<Io> {
+    pub(super) stream: RawXmlStream<Io>,
+}
+
+impl<Io: AsyncBufRead + AsyncWrite + Unpin> PendingFeaturesSend<Io> {
+    /// Send the responder's stream features.
+    ///
+    /// After the stream features have been sent, the stream can be used for
+    /// exchanging stream-level elements (stanzas or "nonzas"). The Rust type
+    /// for these elements must be given as type parameter `T`.
+    pub async fn send_features<T: FromXml + AsXml>(
+        self,
+        features: &'_ StreamFeatures,
+    ) -> io::Result<XmlStream<Io, T>> {
+        let Self { mut stream } = self;
+        let iter = features
+            .as_xml_iter()
+            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
+
+        for item in iter {
+            let item = item.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
+            stream.send(item).await?;
+        }
+        stream.flush().await?;
+
+        Ok(XmlStream::wrap(stream))
+    }
+}

tokio-xmpp/src/xmlstream/tests.rs đź”—

@@ -0,0 +1,264 @@
+// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+use futures::StreamExt;
+
+use xmpp_parsers::stream_features::StreamFeatures;
+
+use super::*;
+
+#[derive(FromXml, AsXml, Debug)]
+#[xml(namespace = "urn:example", name = "data")]
+struct Data {
+    #[xml(text)]
+    contents: String,
+}
+
+#[tokio::test]
+async fn test_initiate_accept_stream() {
+    let (lhs, rhs) = tokio::io::duplex(65536);
+    let initiator = tokio::spawn(async move {
+        let mut stream = initiate_stream(
+            tokio::io::BufStream::new(lhs),
+            "jabber:client",
+            StreamHeader {
+                from: Some("client".into()),
+                to: Some("server".into()),
+                id: Some("client-id".into()),
+            },
+        )
+        .await?;
+        Ok::<_, io::Error>(stream.take_header())
+    });
+    let responder = tokio::spawn(async move {
+        let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?;
+        assert_eq!(stream.header().from.unwrap(), "client");
+        assert_eq!(stream.header().to.unwrap(), "server");
+        assert_eq!(stream.header().id.unwrap(), "client-id");
+        stream
+            .send_header(StreamHeader {
+                from: Some("server".into()),
+                to: Some("client".into()),
+                id: Some("server-id".into()),
+            })
+            .await
+    });
+    responder.await.unwrap().expect("responder");
+    let server_header = initiator.await.unwrap().expect("initiator");
+    assert_eq!(server_header.from.unwrap(), "server");
+    assert_eq!(server_header.to.unwrap(), "client");
+    assert_eq!(server_header.id.unwrap(), "server-id");
+}
+
+#[tokio::test]
+async fn test_exchange_stream_features() {
+    let (lhs, rhs) = tokio::io::duplex(65536);
+    let initiator = tokio::spawn(async move {
+        let stream = initiate_stream(
+            tokio::io::BufStream::new(lhs),
+            "jabber:client",
+            StreamHeader::default(),
+        )
+        .await?;
+        let (features, _) = stream.recv_features::<Data>().await?;
+        Ok::<_, io::Error>(features)
+    });
+    let responder = tokio::spawn(async move {
+        let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?;
+        let stream = stream.send_header(StreamHeader::default()).await?;
+        stream
+            .send_features::<Data>(&StreamFeatures::default())
+            .await?;
+        Ok::<_, io::Error>(())
+    });
+    responder.await.unwrap().expect("responder failed");
+    let features = initiator.await.unwrap().expect("initiator failed");
+    assert_eq!(features, StreamFeatures::default());
+}
+
+#[tokio::test]
+async fn test_exchange_data() {
+    let (lhs, rhs) = tokio::io::duplex(65536);
+
+    let initiator = tokio::spawn(async move {
+        let stream = initiate_stream(
+            tokio::io::BufStream::new(lhs),
+            "jabber:client",
+            StreamHeader::default(),
+        )
+        .await?;
+        let (_, mut stream) = stream.recv_features::<Data>().await?;
+        stream
+            .send(&Data {
+                contents: "hello".to_owned(),
+            })
+            .await?;
+        match stream.next().await {
+            Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        Ok::<_, io::Error>(())
+    });
+
+    let responder = tokio::spawn(async move {
+        let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?;
+        let stream = stream.send_header(StreamHeader::default()).await?;
+        let mut stream = stream
+            .send_features::<Data>(&StreamFeatures::default())
+            .await?;
+        stream
+            .send(&Data {
+                contents: "world!".to_owned(),
+            })
+            .await?;
+        match stream.next().await {
+            Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        Ok::<_, io::Error>(())
+    });
+
+    responder.await.unwrap().expect("responder failed");
+    initiator.await.unwrap().expect("initiator failed");
+}
+
+#[tokio::test]
+async fn test_clean_shutdown() {
+    let (lhs, rhs) = tokio::io::duplex(65536);
+
+    let initiator = tokio::spawn(async move {
+        let stream = initiate_stream(
+            tokio::io::BufStream::new(lhs),
+            "jabber:client",
+            StreamHeader::default(),
+        )
+        .await?;
+        let (_, mut stream) = stream.recv_features::<Data>().await?;
+        stream.close().await?;
+        match stream.next().await {
+            Some(Err(ReadError::StreamFooterReceived)) => (),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        Ok::<_, io::Error>(())
+    });
+
+    let responder = tokio::spawn(async move {
+        let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?;
+        let stream = stream.send_header(StreamHeader::default()).await?;
+        let mut stream = stream
+            .send_features::<Data>(&StreamFeatures::default())
+            .await?;
+        match stream.next().await {
+            Some(Err(ReadError::StreamFooterReceived)) => (),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        stream.close().await?;
+        Ok::<_, io::Error>(())
+    });
+
+    responder.await.unwrap().expect("responder failed");
+    initiator.await.unwrap().expect("initiator failed");
+}
+
+#[tokio::test]
+async fn test_exchange_data_stream_reset_and_shutdown() {
+    let (lhs, rhs) = tokio::io::duplex(65536);
+
+    let initiator = tokio::spawn(async move {
+        let stream = initiate_stream(
+            tokio::io::BufStream::new(lhs),
+            "jabber:client",
+            StreamHeader::default(),
+        )
+        .await?;
+        let (_, mut stream) = stream.recv_features::<Data>().await?;
+        stream
+            .send(&Data {
+                contents: "hello".to_owned(),
+            })
+            .await?;
+        match stream.next().await {
+            Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        let stream = stream
+            .initiate_reset(StreamHeader {
+                from: Some("client".into()),
+                to: Some("server".into()),
+                id: Some("client-id".into()),
+            })
+            .await?;
+        assert_eq!(stream.header().from.unwrap(), "server");
+        assert_eq!(stream.header().to.unwrap(), "client");
+        assert_eq!(stream.header().id.unwrap(), "server-id");
+
+        let (_, mut stream) = stream.recv_features::<Data>().await?;
+        stream
+            .send(&Data {
+                contents: "once more".to_owned(),
+            })
+            .await?;
+        stream.close().await?;
+        match stream.next().await {
+            Some(Ok(Data { contents })) => assert_eq!(contents, "hello world!"),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        match stream.next().await {
+            Some(Err(ReadError::StreamFooterReceived)) => (),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        Ok::<_, io::Error>(())
+    });
+
+    let responder = tokio::spawn(async move {
+        let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?;
+        let stream = stream.send_header(StreamHeader::default()).await?;
+        let mut stream = stream
+            .send_features::<Data>(&StreamFeatures::default())
+            .await?;
+        stream
+            .send(&Data {
+                contents: "world!".to_owned(),
+            })
+            .await?;
+        match stream.next().await {
+            Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        let stream = stream.accept_reset().await?;
+        assert_eq!(stream.header().from.unwrap(), "client");
+        assert_eq!(stream.header().to.unwrap(), "server");
+        assert_eq!(stream.header().id.unwrap(), "client-id");
+        let stream = stream
+            .send_header(StreamHeader {
+                from: Some("server".into()),
+                to: Some("client".into()),
+                id: Some("server-id".into()),
+            })
+            .await?;
+        let mut stream = stream
+            .send_features::<Data>(&StreamFeatures::default())
+            .await?;
+        stream
+            .send(&Data {
+                contents: "hello world!".to_owned(),
+            })
+            .await?;
+        match stream.next().await {
+            Some(Ok(Data { contents })) => assert_eq!(contents, "once more"),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        stream.close().await?;
+        match stream.next().await {
+            Some(Err(ReadError::StreamFooterReceived)) => (),
+            other => panic!("unexpected stream message: {:?}", other),
+        }
+        Ok::<_, io::Error>(())
+    });
+
+    responder.await.unwrap().expect("responder failed");
+    initiator.await.unwrap().expect("initiator failed");
+}