From c7238973267bc9428dbf72da30e97e7e0b19fa7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Sch=C3=A4fer?= Date: Wed, 7 Aug 2024 15:52:55 +0200 Subject: [PATCH] tokio_xmpp: introduce xmlstream module This module provides XSO-based parsing, proper typestates and will soon replace the proto module. --- tokio-xmpp/Cargo.toml | 2 + tokio-xmpp/ChangeLog | 4 +- tokio-xmpp/src/lib.rs | 1 + tokio-xmpp/src/xmlstream/common.rs | 591 ++++++++++++++++++++++++++ tokio-xmpp/src/xmlstream/initiator.rs | 63 +++ tokio-xmpp/src/xmlstream/mod.rs | 330 ++++++++++++++ tokio-xmpp/src/xmlstream/responder.rs | 98 +++++ tokio-xmpp/src/xmlstream/tests.rs | 264 ++++++++++++ 8 files changed, 1351 insertions(+), 2 deletions(-) create mode 100644 tokio-xmpp/src/xmlstream/common.rs create mode 100644 tokio-xmpp/src/xmlstream/initiator.rs create mode 100644 tokio-xmpp/src/xmlstream/mod.rs create mode 100644 tokio-xmpp/src/xmlstream/responder.rs create mode 100644 tokio-xmpp/src/xmlstream/tests.rs diff --git a/tokio-xmpp/Cargo.toml b/tokio-xmpp/Cargo.toml index 095ac749c89c332557a382764a056c984f7e35cf..709291e464e0d389c13ab78ce35f733ba79511c3 100644 --- a/tokio-xmpp/Cargo.toml +++ b/tokio-xmpp/Cargo.toml @@ -23,10 +23,12 @@ rustls-native-certs = { version = "0.7", optional = true } rxml = { version = "0.12.0", features = ["compact_str"] } rand = "0.8" syntect = { version = "5", optional = true } +pin-project-lite = { version = "0.2" } # same repository dependencies sasl = { version = "0.5", path = "../sasl" } xmpp-parsers = { version = "0.21", path = "../parsers" } minidom = { version = "0.16", path = "../minidom" } +xso = { version = "0.1", path = "../xso" } # these are only needed for starttls ServerConnector support hickory-resolver = { version = "0.24", optional = true} diff --git a/tokio-xmpp/ChangeLog b/tokio-xmpp/ChangeLog index 5d377c4d506cafc07787558ae3c5d066b0d56268..f915bffa774d41786d7b116d6d2c3034229a0e7e 100644 --- a/tokio-xmpp/ChangeLog +++ b/tokio-xmpp/ChangeLog @@ -15,8 +15,6 @@ XXXX-YY-ZZ RELEASER - `AsyncClient::poll_next` properly closes stream with `Poll::Ready(None)` when disconnecting without auto reconnect (!436) - remove `tokio_xmpp::SimpleClient` because it was not widely used, and not well documented ; if you need it, please let us know and it will be reintegrated (!428) - - `XMPPStream` was renamed `XmppStream` and is now published as `tokio_xmpp::proto::XmppStream` (!428) - - `XmppCodec` was moved to proto module and is now published as `tokio_xmpp::proto::XmppCodec` (!428) - `Component::new` and `Client::new only require jid/password argument (!428) - `ServerConfig` and `Client::new_with_config` have been removed (!428) - ``Client` now has `new_plaintext`, `new_starttls` and `new_with_connector` method (!428) @@ -25,6 +23,8 @@ XXXX-YY-ZZ RELEASER - `Component` now has `new_plaintext` and `new_with_connector` constructors, just like `Client` but without StartTLS (!428) - `tokio_xmpp::AsyncClient` has been renamed `tokio_xmpp::Client` (!428) - `Component` is now gated behind `insecure-tcp` feature flag + - `XMPPStream` and `XmppCodec` were removed in favour of the newly + implemented `tokio_xmpp::xmlstream module. Version 4.0.0: 2024-07-26 Maxime “pep” Buquet diff --git a/tokio-xmpp/src/lib.rs b/tokio-xmpp/src/lib.rs index 347e164c6705f3b7eb5070dd90b47891d1a4e3aa..a10cf5f040a566fbec21b6856465198a1869578d 100644 --- a/tokio-xmpp/src/lib.rs +++ b/tokio-xmpp/src/lib.rs @@ -50,6 +50,7 @@ mod event; pub use event::Event; pub mod connect; pub mod proto; +pub mod xmlstream; mod client; pub use client::Client; diff --git a/tokio-xmpp/src/xmlstream/common.rs b/tokio-xmpp/src/xmlstream/common.rs new file mode 100644 index 0000000000000000000000000000000000000000..a30286b3fd8d0e73a649aa2b56404d45e9336bea --- /dev/null +++ b/tokio-xmpp/src/xmlstream/common.rs @@ -0,0 +1,591 @@ +// Copyright (c) 2024 Jonas Schäfer +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use std::borrow::Cow; +use std::io; + +use futures::{ready, Sink, SinkExt, Stream, StreamExt}; + +use bytes::{Buf, BytesMut}; + +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use xso::{ + exports::rxml::{self, writer::TrackNamespace, xml_ncname, Event, Namespace}, + FromEventsBuilder, FromXml, Item, +}; + +use xmpp_parsers::ns::STREAM as XML_STREAM_NS; + +pin_project_lite::pin_project! { + // NOTE: due to limitations of pin_project_lite, the field comments are + // no doc comments. Luckily, this struct is only `pub(super)` anyway. + #[project = RawXmlStreamProj] + pub(super) struct RawXmlStream { + // The parser used for deserialising data. + #[pin] + parser: rxml::AsyncReader, + + // The writer used for serialising data. + writer: rxml::writer::Encoder, + + // The default namespace to declare on the stream header. + stream_ns: &'static str, + + // Buffer containing serialised data which will then be sent through + // the inner `Io`. Sending that serialised data happens in + // `poll_ready` and `poll_flush`, while appending serialised data + // happens in `start_send`. + tx_buffer: BytesMut, + + // This signifies the limit at the point of which the Sink will + // refuse to accept more data: if the `tx_buffer`'s size grows beyond + // that high water mark, poll_ready will return Poll::Pending until + // it has managed to flush enough data down the inner writer. + // + // Note that poll_ready will always attempt to progress the writes, + // which further reduces the chance of hitting this limit unless + // either the underlying writer gets stuck (e.g. TCP connection + // breaking in a timeouty way) or a lot of data is written in bulk. + // In both cases, the backpressure created by poll_ready returning + // Pending is desirable. + // + // However, there is a catch: We don't assert this condition + // in `start_send` at all. The reason is that we cannot suspend + // serialisation of an XSO in the middle of writing it: it has to be + // written in one batch or you have to start over later (this has to + // do with the iterator state borrowing the data and futures getting + // cancelled e.g. in tokio::select!). In order to facilitate + // implementing a `Sink` on top of `RawXmlStream`, we + // cannot be strict about what is going on in `start_send`: + // `poll_ready` does not know what kind of data will be written (so + // it could not make a size estimate, even if that was at all + // possible with AsXml) and `start_send` is not a coroutine. So if + // `Sink` wants to use `RawXmlStream`, it must be able to + // submit an entire XSO's items in one batch to `RawXmlStream` after + // it has reported to be ready once. That may easily make the buffer + // reach its high water mark. + // + // So if we checked that condition in `start_send` (as opposed to + // `poll_ready`), we would cause situations where submitting XSOs + // failed randomly (with a panic or other errors) and would have to + // be retried later. + // + // While failing with e.g. io::ErrorKind::WouldBlock is something + // that could be investigated later, it would still require being + // able to make an accurate estimate of the number of bytes needed to + // serialise any given `AsXml`, because as pointed out earlier, once + // we have started, there is no going back. + // + // Finally, none of that hurts much because `RawXmlStream` is only an + // internal API. The high-level APIs will always call `poll_ready` + // before sending an XSO, which means that we won't *grossly* go over + // the TX buffer high water mark---unless you send a really large + // XSO at once. + tx_buffer_high_water_mark: usize, + } +} + +impl RawXmlStream { + fn new_writer( + stream_ns: &'static str, + ) -> rxml::writer::Encoder { + let mut writer = rxml::writer::Encoder::new(); + writer + .ns_tracker_mut() + .declare_fixed(Some(xml_ncname!("stream")), XML_STREAM_NS.into()); + writer + .ns_tracker_mut() + .declare_fixed(None, stream_ns.into()); + writer + } + + pub(super) fn new(io: Io, stream_ns: &'static str) -> Self { + let parser = rxml::Parser::default(); + Self { + parser: rxml::AsyncReader::wrap(io, parser), + writer: Self::new_writer(stream_ns), + stream_ns, + tx_buffer: BytesMut::new(), + + // This basically means: "if we already have 2 kiB in our send + // buffer, do not accept more data". + // Please see the extensive words at + //`Self::tx_buffer_high_water_mark` for details. + tx_buffer_high_water_mark: 2048, + } + } + + pub(super) fn reset_state(self: Pin<&mut Self>) { + let this = self.project(); + *this.parser.parser_pinned() = rxml::Parser::default(); + *this.writer = Self::new_writer(this.stream_ns); + } +} + +impl RawXmlStream { + fn parser_pinned(self: Pin<&mut Self>) -> &mut rxml::Parser { + self.project().parser.parser_pinned() + } +} + +impl Stream for RawXmlStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + return Poll::Ready( + match ready!(this.parser.as_mut().poll_read(cx)).transpose() { + // Skip the XML declaration, nobody wants to hear about that. + Some(Ok(rxml::Event::XmlDeclaration(_, _))) => continue, + other => other, + }, + ); + } + } +} + +impl<'x, Io: AsyncWrite> RawXmlStreamProj<'x, Io> { + fn progress_write(&mut self, cx: &mut Context<'_>) -> Poll> { + while self.tx_buffer.len() > 0 { + let written = match ready!(self + .parser + .as_mut() + .inner_pinned() + .poll_write(cx, &self.tx_buffer)) + { + Ok(v) => v, + Err(e) => return Poll::Ready(Err(e)), + }; + self.tx_buffer.advance(written); + } + Poll::Ready(Ok(())) + } +} + +impl<'x, Io: AsyncWrite> Sink> for RawXmlStream { + type Error = io::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + match this.progress_write(cx) { + // No progress on write, but if we have enough space in the buffer + // it's ok nonetheless. + Poll::Pending => (), + // Some progress and it went fine, move on. + Poll::Ready(Ok(())) => (), + // Something went wrong -> return the error. + Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), + } + if this.tx_buffer.len() < *this.tx_buffer_high_water_mark { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + ready!(this.progress_write(cx))?; + this.parser.as_mut().inner_pinned().poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + ready!(this.progress_write(cx))?; + this.parser.as_mut().inner_pinned().poll_shutdown(cx) + } + + fn start_send(self: Pin<&mut Self>, item: xso::Item<'x>) -> Result<(), Self::Error> { + let this = self.project(); + this.writer + .encode_into_bytes(item.as_rxml_item(), this.tx_buffer) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e)) + } +} + +/// Error returned by the [`ReadXso`] future and the [`ReadXsoState`] helper. +pub(super) enum ReadXsoError { + /// The outer element was closed before a child element could be read. + /// + /// This is typically the stream footer in XML stream applications. + Footer, + + /// A hard error occurred. + /// + /// This is either a real I/O error or an error from the XML parser. + /// Neither are recoverable, because the nesting state is lost and + /// in addition, XML errors are not recoverable because they indicate a + /// not well-formed document. + Hard(io::Error), + + /// A parse error occurred. + /// + /// The XML structure was well-formed, but the data contained did not + /// match the XSO which was attempted to be parsed. This error is + /// recoverable: when this error is emitted, the XML stream is at the same + /// nesting level as it was before the XSO was attempted to be read; all + /// XML structure which belonged to the XSO which failed to parse has + /// been consumed. This allows to read more XSOs even if one fails to + /// parse. + Parse(xso::error::Error), +} + +impl From for io::Error { + fn from(other: ReadXsoError) -> Self { + match other { + ReadXsoError::Hard(v) => v, + ReadXsoError::Parse(e) => io::Error::new(io::ErrorKind::InvalidData, e), + ReadXsoError::Footer => io::Error::new( + io::ErrorKind::UnexpectedEof, + "element footer while waiting for XSO element start", + ), + } + } +} + +impl From for ReadXsoError { + fn from(other: io::Error) -> Self { + Self::Hard(other) + } +} + +impl From for ReadXsoError { + fn from(other: xso::error::Error) -> Self { + Self::Parse(other) + } +} + +/// State for reading an XSO from a `Stream>`. +/// +/// Due to pinning, it is simpler to implement the statemachine in a dedicated +/// enum and let the actual (pinned) future pass the stream toward this enum's +/// function. +/// +/// This is used by both [`ReadXso`] and the [`super::XmlStream`] itself. +#[derive(Default)] +pub(super) enum ReadXsoState { + /// The [`rxml::Event::StartElement`] event was not seen yet. + /// + /// In this state, XML whitespace is ignored (as per RFC 6120 § 11.7), but + /// other text data is rejected. + #[default] + PreData, + + /// The [`rxml::Event::StartElement`] event was received. + /// + /// The inner value is the builder for the "return type" of this enum and + /// the implementation in the [`xso`] crate does all the heavy lifting: + /// we'll only send events in its general direction. + // We use the fallible parsing here so that we don't have to do the depth + // accounting ourselves. + Parsing( as FromXml>::Builder), + + /// The parsing has completed (successful or not). + /// + /// This is a final state and attempting to advance the state will panic. + /// This is in accordance with [`core::future::Future::poll`]'s contract, + /// for which this enum is primarily used. + Done, +} + +impl ReadXsoState { + /// Progress reading the XSO from the given source. + /// + /// This attempts to parse a single XSO from the underlying stream, + /// while discarding any XML whitespace before the beginning of the XSO. + /// + /// If the XSO is parsed successfully, the method returns Ready with the + /// parsed value. If parsing fails or an I/O error occurs, an appropriate + /// error is returned. + /// + /// If parsing fails, the entire XML subtree belonging to the XSO is + /// nonetheless processed. That makes parse errors recoverable: After + /// `poll_advance` has returned Ready with either an Ok result or a + /// [`ReadXsoError::Parse`] error variant, another XSO can be read and the + /// XML parsing will be at the same nesting depth as it was before the + /// first call to `poll_advance`. + /// + /// Note that this guarantee does not hold for non-parse errors (i.e. for + /// the other variants of [`ReadXsoError`]): I/O errors as well as + /// occurrence of the outer closing element are fatal. + /// + /// The `source` passed to `poll_advance` should be the same on every + /// call. + pub(super) fn poll_advance( + &mut self, + mut source: Pin<&mut RawXmlStream>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + // Disable text buffering before the start event. That way, we + // don't accumulate infinite amounts of XML whitespace caused by + // whitespace keepalives. + // (And also, we'll know faster when the remote side sends + // non-whitespace garbage.) + let text_buffering = match self { + ReadXsoState::PreData => false, + _ => true, + }; + source + .as_mut() + .parser_pinned() + .set_text_buffering(text_buffering); + let ev = ready!(source.as_mut().poll_next(cx)).transpose()?; + match self { + ReadXsoState::PreData => match ev { + Some(rxml::Event::XmlDeclaration(_, _)) => (), + Some(rxml::Event::Text(_, data)) => { + if xso::is_xml_whitespace(data.as_bytes()) { + continue; + } else { + *self = ReadXsoState::Done; + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "non-whitespace text content before XSO", + ) + .into())); + } + } + Some(rxml::Event::StartElement(_, name, attrs)) => { + *self = ReadXsoState::Parsing( + as FromXml>::from_events(name, attrs) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, + ); + } + // Amounts to EOF, as we expect to start on the stream level. + Some(rxml::Event::EndElement(_)) => { + *self = ReadXsoState::Done; + return Poll::Ready(Err(ReadXsoError::Footer)); + } + None => { + *self = ReadXsoState::Done; + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "end of parent element before XSO started", + ) + .into())); + } + }, + ReadXsoState::Parsing(builder) => { + let Some(ev) = ev else { + *self = ReadXsoState::Done; + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "eof during XSO parsing", + ) + .into())); + }; + + match builder.feed(ev) { + Err(err) => { + *self = ReadXsoState::Done; + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + err, + ) + .into())); + } + Ok(Some(Err(err))) => { + *self = ReadXsoState::Done; + return Poll::Ready(Err(ReadXsoError::Parse(err))); + } + Ok(Some(Ok(value))) => { + *self = ReadXsoState::Done; + return Poll::Ready(Ok(value)); + } + Ok(None) => (), + } + } + + // The error talks about "future", simply because that is + // where `Self` is used (inside `core::future::Future::poll`). + ReadXsoState::Done => panic!("future polled after completion"), + } + } + } +} + +/// Future to read a single XSO from a stream. +pub(super) struct ReadXso<'x, Io, T: FromXml> { + /// Stream to read the future from. + inner: Pin<&'x mut RawXmlStream>, + + /// Current state of parsing. + state: ReadXsoState, +} + +impl<'x, Io: AsyncBufRead, T: FromXml> ReadXso<'x, Io, T> { + /// Start reading a single XSO from a stream. + pub(super) fn read_from(stream: Pin<&'x mut RawXmlStream>) -> Self { + Self { + inner: stream, + state: ReadXsoState::PreData, + } + } +} + +impl<'x, Io: AsyncBufRead, T: FromXml> Future for ReadXso<'x, Io, T> +where + T::Builder: Unpin, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + this.state.poll_advance(this.inner.as_mut(), cx) + } +} + +/// Contains metadata from an XML stream header +#[derive(Default)] +pub struct StreamHeader<'x> { + /// The optional `from` attribute. + pub from: Option>, + + /// The optional `to` attribute. + pub to: Option>, + + /// The optional `id` attribute. + pub id: Option>, +} + +impl<'x> StreamHeader<'x> { + /// Take the contents and return them as new object. + /// + /// `self` will be left with all its parts set to `None`. + pub fn take(&mut self) -> Self { + Self { + from: self.from.take(), + to: self.to.take(), + id: self.id.take(), + } + } + + pub(super) async fn send( + self, + mut stream: Pin<&mut RawXmlStream>, + ) -> io::Result<()> { + stream + .send(Item::XmlDeclaration(rxml::XmlVersion::V1_0)) + .await?; + stream + .send(Item::ElementHeadStart( + Namespace::from(XML_STREAM_NS), + Cow::Borrowed(xml_ncname!("stream")), + )) + .await?; + if let Some(from) = self.from { + stream + .send(Item::Attribute( + Namespace::NONE, + Cow::Borrowed(xml_ncname!("from")), + from, + )) + .await?; + } + if let Some(to) = self.to { + stream + .send(Item::Attribute( + Namespace::NONE, + Cow::Borrowed(xml_ncname!("to")), + to, + )) + .await?; + } + if let Some(id) = self.id { + stream + .send(Item::Attribute( + Namespace::NONE, + Cow::Borrowed(xml_ncname!("id")), + id, + )) + .await?; + } + stream + .send(Item::Attribute( + Namespace::NONE, + Cow::Borrowed(xml_ncname!("version")), + Cow::Borrowed("1.0"), + )) + .await?; + stream.send(Item::ElementHeadEnd).await?; + Ok(()) + } +} + +impl StreamHeader<'static> { + pub(super) async fn recv( + mut stream: Pin<&mut RawXmlStream>, + ) -> io::Result { + loop { + match stream.as_mut().next().await.transpose()? { + Some(Event::StartElement(_, (ns, name), mut attrs)) => { + if ns != XML_STREAM_NS || name != "stream" { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "unknown stream header", + )); + } + + match attrs.remove(Namespace::none(), "version") { + Some(v) => { + if v != "1.0" { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unsuppored stream version: {}", v), + )); + } + } + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "required `version` attribute missing", + )) + } + } + + let from = attrs.remove(Namespace::none(), "from"); + let to = attrs.remove(Namespace::none(), "to"); + let id = attrs.remove(Namespace::none(), "id"); + let _ = attrs.remove(Namespace::xml(), "lang"); + + if let Some(((ns, name), _)) = attrs.into_iter().next() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("unexpected stream header attribute: {{{}}}{}", ns, name), + )); + } + + return Ok(StreamHeader { + from: from.map(Cow::Owned), + to: to.map(Cow::Owned), + id: id.map(Cow::Owned), + }); + } + Some(Event::Text(_, _)) | Some(Event::EndElement(_)) => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "unexpected content before stream header", + )) + } + // We cannot loop infinitely here because the XML parser will + // prevent more than one XML declaration from being parsed. + Some(Event::XmlDeclaration(_, _)) => (), + None => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "eof before stream header", + )) + } + } + } + } +} diff --git a/tokio-xmpp/src/xmlstream/initiator.rs b/tokio-xmpp/src/xmlstream/initiator.rs new file mode 100644 index 0000000000000000000000000000000000000000..76cc4d47cbc2d9c4d0286c52ccd0ac6f734b8e38 --- /dev/null +++ b/tokio-xmpp/src/xmlstream/initiator.rs @@ -0,0 +1,63 @@ +// Copyright (c) 2024 Jonas Schäfer +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use core::pin::Pin; +use std::borrow::Cow; +use std::io; + +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use xmpp_parsers::stream_features::StreamFeatures; + +use xso::{AsXml, FromXml}; + +use super::{ + common::{RawXmlStream, ReadXso, StreamHeader}, + XmlStream, +}; + +/// Type state for an initiator stream which has sent and received the stream +/// header. +/// +/// To continue stream setup, call [`recv_features`][`Self::recv_features`]. +pub struct PendingFeaturesRecv { + pub(super) stream: RawXmlStream, + pub(super) header: StreamHeader<'static>, +} + +impl PendingFeaturesRecv { + /// The stream header contents as sent by the peer. + pub fn header(&self) -> StreamHeader<'_> { + StreamHeader { + from: self.header.from.as_ref().map(|x| Cow::Borrowed(&**x)), + to: self.header.to.as_ref().map(|x| Cow::Borrowed(&**x)), + id: self.header.id.as_ref().map(|x| Cow::Borrowed(&**x)), + } + } + + /// Extract the stream header contents as sent by the peer. + pub fn take_header(&mut self) -> StreamHeader<'static> { + self.header.take() + } +} + +impl PendingFeaturesRecv { + /// Receive the responder's stream features. + /// + /// After the stream features have been received, the stream can be used + /// for exchanging stream-level elements (stanzas or "nonzas"). The Rust + /// type for these elements must be given as type parameter `T`. + pub async fn recv_features( + self, + ) -> io::Result<(StreamFeatures, XmlStream)> { + let Self { + mut stream, + header: _, + } = self; + let features = ReadXso::read_from(Pin::new(&mut stream)).await?; + Ok((features, XmlStream::wrap(stream))) + } +} diff --git a/tokio-xmpp/src/xmlstream/mod.rs b/tokio-xmpp/src/xmlstream/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..bac1d3de1149c12ae03fe9312174aceb2422d0c8 --- /dev/null +++ b/tokio-xmpp/src/xmlstream/mod.rs @@ -0,0 +1,330 @@ +// Copyright (c) 2024 Jonas Schäfer +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//! # RFC 6120 XML Streams +//! +//! **Note:** The XML stream is a low-level API which you should probably not +//! use directly. +//! +//! Establishing an XML stream is always a multi-stage process due to how +//! stream negotiation works. Based on the values sent by the initiator in the +//! stream header, the responder may choose to offer different features. +//! +//! In order to allow this, the following multi-step processes are defined. +//! +//! ## Initiating an XML stream +//! +//! To initiate an XML stream, you need to: +//! +//! 1. Call [`initiate_stream`] to obtain the [`PendingFeaturesRecv`] object. +//! That object holds the stream header sent by the peer for inspection. +//! 2. Call [`PendingFeaturesRecv::recv_features`] if you are content with +//! the content of the stream header to obtain the [`XmlStream`] object and +//! the features sent by the peer. +//! +//! ## Accepting an XML stream connection +//! +//! To accept an XML stream, you need to: +//! +//! 1. Call [`accept_stream`] to obtain the [`AcceptedStream`] object. +//! That object holds the stream header sent by the peer for inspection. +//! 2. Call [`AcceptedStream::send_header`] if you are content with +//! the content of the stream header to obtain the [`PendingFeaturesSend`] +//! object. +//! 3. Call [`PendingFeaturesSend::send_features`] to send the stream features +//! to the peer and obtain the [`XmlStream`] object. + +use core::pin::Pin; +use core::task::{Context, Poll}; +use std::io; + +use futures::{ready, Sink, SinkExt, Stream}; + +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use xso::{AsXml, FromXml, Item}; + +mod common; +mod initiator; +mod responder; +#[cfg(test)] +mod tests; + +use self::common::{RawXmlStream, ReadXsoError, ReadXsoState, StreamHeader}; +pub use self::initiator::PendingFeaturesRecv; +pub use self::responder::{AcceptedStream, PendingFeaturesSend}; + +/// Initiate a new stream +/// +/// Initiate a new stream using the given I/O object `io`. The default +/// XML namespace will be set to `stream_ns` and the stream header will use +/// the attributes as set in `stream_header`, along with version `1.0`. +/// +/// The returned object contains the stream header sent by the remote side +/// as well as the internal parser state to continue the negotiation. +pub async fn initiate_stream( + io: Io, + stream_ns: &'static str, + stream_header: StreamHeader<'_>, +) -> Result, io::Error> { + let mut raw_stream = RawXmlStream::new(io, stream_ns); + stream_header.send(Pin::new(&mut raw_stream)).await?; + raw_stream.flush().await?; + + let header = StreamHeader::recv(Pin::new(&mut raw_stream)).await?; + + Ok(PendingFeaturesRecv { + stream: raw_stream, + header, + }) +} + +/// Accept a new XML stream as responder +/// +/// Prepares the responer side of an XML stream using the given I/O object +/// `io`. The default XML namespace will be set to `stream_ns`. +/// +/// The returned object contains the stream header sent by the remote side +/// as well as the internal parser state to continue the negotiation. +pub async fn accept_stream( + io: Io, + stream_ns: &'static str, +) -> Result, io::Error> { + let mut stream = RawXmlStream::new(io, stream_ns); + let header = StreamHeader::recv(Pin::new(&mut stream)).await?; + Ok(AcceptedStream { stream, header }) +} + +/// A non-success state which may occur while reading an XSO from a +/// [`XmlStream`] +#[derive(Debug)] +pub enum ReadError { + /// The soft timeout of the stream triggered. + /// + /// User code should handle this by sending something into the stream + /// which causes the peer to send data before the hard timeout triggers. + SoftTimeout, + + /// An I/O error occurred in the underlying I/O object. + /// + /// This is generally fatal. + HardError(io::Error), + + /// A parse error occurred while processing the XSO. + /// + /// This is non-fatal and more XSOs may be read from the stream. + ParseError(xso::error::Error), + + /// The stream footer was received. + /// + /// Any future read attempts will again return this error. The stream has + /// been closed by the peer and you should probably close it, too. + StreamFooterReceived, +} + +enum WriteState { + Open, + SendElementFoot, + FooterSent, + Failed, +} + +impl WriteState { + fn check_ok(&self) -> io::Result<()> { + match self { + WriteState::Failed => Err(io::Error::new( + io::ErrorKind::NotConnected, + "XML stream sink unusable because of previous write error", + )), + WriteState::Open | WriteState::SendElementFoot | WriteState::FooterSent => Ok(()), + } + } + + fn check_writable(&self) -> io::Result<()> { + match self { + WriteState::SendElementFoot | WriteState::FooterSent => Err(io::Error::new( + io::ErrorKind::NotConnected, + "stream footer already sent", + )), + WriteState::Failed | WriteState::Open => self.check_ok(), + } + } +} + +pin_project_lite::pin_project! { + /// XML stream + /// + /// This struct represents an + /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the + /// payload consists of items of type `T` implementing [`FromXml`] and + /// [`AsXml`]. + pub struct XmlStream { + #[pin] + inner: RawXmlStream, + read_state: Option>, + write_state: WriteState, + } +} + +impl XmlStream { + fn wrap(inner: RawXmlStream) -> Self { + Self { + inner, + read_state: Some(ReadXsoState::default()), + write_state: WriteState::Open, + } + } + + fn assert_retypable(&self) { + match self.read_state { + Some(ReadXsoState::PreData) => (), + Some(_) => panic!("cannot reset stream: XSO parsing in progress!"), + None => panic!("cannot reset stream: stream footer received!"), + } + match self.write_state.check_writable() { + Ok(()) => (), + Err(e) => panic!("cannot reset stream: {}", e), + } + } +} + +impl XmlStream { + /// Initiate a stream reset + /// + /// The `header` is the new stream header which is sent to the remote + /// party. + /// + /// # Panics + /// + /// Attempting to reset the stream while an object is being received will + /// panic. This can generally only happen if you call `poll_next` + /// directly, as doing that is otherwise prevented by the borrowchecker. + /// + /// In addition, attempting to reset a stream which has been closed by + /// either side or which has had an I/O error will also cause a panic. + pub async fn initiate_reset( + self, + header: StreamHeader<'_>, + ) -> io::Result> { + self.assert_retypable(); + + let mut stream = self.inner; + Pin::new(&mut stream).reset_state(); + header.send(Pin::new(&mut stream)).await?; + stream.flush().await?; + let header = StreamHeader::recv(Pin::new(&mut stream)).await?; + Ok(PendingFeaturesRecv { stream, header }) + } + + /// Anticipate a new stream header sent by the remote party. + /// + /// This is the responder-side counterpart to + /// [`initiate_reset`][`Self::initiate_reset`]. + /// + /// # Panics + /// + /// Attempting to reset the stream while an object is being received will + /// panic. This can generally only happen if you call `poll_next` + /// directly, as doing that is otherwise prevented by the borrowchecker. + /// + /// In addition, attempting to reset a stream which has been closed by + /// either side or which has had an I/O error will also cause a panic. + pub async fn accept_reset(self) -> io::Result> { + self.assert_retypable(); + + let mut stream = self.inner; + Pin::new(&mut stream).reset_state(); + let header = StreamHeader::recv(Pin::new(&mut stream)).await?; + Ok(AcceptedStream { stream, header }) + } +} + +impl Stream for XmlStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let result = match this.read_state.as_mut() { + None => return Poll::Ready(Some(Err(ReadError::StreamFooterReceived))), + Some(read_state) => ready!(read_state.poll_advance(this.inner, cx)), + }; + let result = match result { + Ok(v) => Poll::Ready(Some(Ok(v))), + Err(ReadXsoError::Hard(e)) => Poll::Ready(Some(Err(ReadError::HardError(e)))), + Err(ReadXsoError::Parse(e)) => Poll::Ready(Some(Err(ReadError::ParseError(e)))), + Err(ReadXsoError::Footer) => { + *this.read_state = None; + Poll::Ready(Some(Err(ReadError::StreamFooterReceived))) + } + }; + *this.read_state = Some(ReadXsoState::default()); + result + } +} + +impl<'x, Io: AsyncWrite, T: FromXml + AsXml> Sink<&'x T> for XmlStream { + type Error = io::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.write_state.check_writable()?; + this.inner.poll_ready(cx) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.write_state.check_writable()?; + this.inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + this.write_state.check_ok()?; + loop { + match this.write_state { + // Open => initiate closing. + WriteState::Open => { + *this.write_state = WriteState::SendElementFoot; + } + // Sending => wait for readiness, then send. + WriteState::SendElementFoot => { + match ready!(this.inner.as_mut().poll_ready(cx)) + .and_then(|_| this.inner.as_mut().start_send(Item::ElementFoot)) + { + Ok(()) => (), + // If it fails, we fail the sink immediately. + Err(e) => { + *this.write_state = WriteState::Failed; + return Poll::Ready(Err(e)); + } + } + *this.write_state = WriteState::FooterSent; + } + // Footer sent => just poll the inner sink for closure. + WriteState::FooterSent => break, + WriteState::Failed => unreachable!(), // caught by check_ok() + } + } + this.inner.poll_close(cx) + } + + fn start_send(self: Pin<&mut Self>, item: &'x T) -> Result<(), Self::Error> { + let mut this = self.project(); + this.write_state.check_writable()?; + let iter = match item.as_xml_iter() { + Ok(v) => v, + Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)), + }; + for item in iter { + let item = match item { + Ok(v) => v, + Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)), + }; + this.inner.as_mut().start_send(item)?; + } + Ok(()) + } +} diff --git a/tokio-xmpp/src/xmlstream/responder.rs b/tokio-xmpp/src/xmlstream/responder.rs new file mode 100644 index 0000000000000000000000000000000000000000..3b0d7972397bc736fa7ea7484f3bcd056948b9d0 --- /dev/null +++ b/tokio-xmpp/src/xmlstream/responder.rs @@ -0,0 +1,98 @@ +// Copyright (c) 2024 Jonas Schäfer +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use core::pin::Pin; +use std::borrow::Cow; +use std::io; + +use futures::SinkExt; + +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use xmpp_parsers::stream_features::StreamFeatures; + +use xso::{AsXml, FromXml}; + +use super::{ + common::{RawXmlStream, StreamHeader}, + XmlStream, +}; + +/// Type state for a responder stream which has received a stream header +/// +/// To continue stream setup, call [`send_header`][`Self::send_header`]. +pub struct AcceptedStream { + pub(super) stream: RawXmlStream, + pub(super) header: StreamHeader<'static>, +} + +impl AcceptedStream { + /// The stream header contents as sent by the peer. + pub fn header(&self) -> StreamHeader<'_> { + StreamHeader { + from: self.header.from.as_ref().map(|x| Cow::Borrowed(&**x)), + to: self.header.to.as_ref().map(|x| Cow::Borrowed(&**x)), + id: self.header.id.as_ref().map(|x| Cow::Borrowed(&**x)), + } + } + + /// Extract the stream header contents as sent by the peer. + pub fn take_header(&mut self) -> StreamHeader<'static> { + self.header.take() + } +} + +impl AcceptedStream { + /// Send a stream header. + /// + /// Sends the given stream header to the initiator. Returns a new object + /// which is prepared to send the stream features. + pub async fn send_header( + self, + header: StreamHeader<'_>, + ) -> io::Result> { + let Self { + mut stream, + header: _, + } = self; + + header.send(Pin::new(&mut stream)).await?; + Ok(PendingFeaturesSend { stream }) + } +} + +/// Type state for a responder stream which has received and sent the stream +/// header. +/// +/// To continue stream setup, call [`send_features`][`Self::send_features`]. +pub struct PendingFeaturesSend { + pub(super) stream: RawXmlStream, +} + +impl PendingFeaturesSend { + /// Send the responder's stream features. + /// + /// After the stream features have been sent, the stream can be used for + /// exchanging stream-level elements (stanzas or "nonzas"). The Rust type + /// for these elements must be given as type parameter `T`. + pub async fn send_features( + self, + features: &'_ StreamFeatures, + ) -> io::Result> { + let Self { mut stream } = self; + let iter = features + .as_xml_iter() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + + for item in iter { + let item = item.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + stream.send(item).await?; + } + stream.flush().await?; + + Ok(XmlStream::wrap(stream)) + } +} diff --git a/tokio-xmpp/src/xmlstream/tests.rs b/tokio-xmpp/src/xmlstream/tests.rs new file mode 100644 index 0000000000000000000000000000000000000000..92ea5fc1e6f4aa66157e4dcb4e652e66297969e7 --- /dev/null +++ b/tokio-xmpp/src/xmlstream/tests.rs @@ -0,0 +1,264 @@ +// Copyright (c) 2024 Jonas Schäfer +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use futures::StreamExt; + +use xmpp_parsers::stream_features::StreamFeatures; + +use super::*; + +#[derive(FromXml, AsXml, Debug)] +#[xml(namespace = "urn:example", name = "data")] +struct Data { + #[xml(text)] + contents: String, +} + +#[tokio::test] +async fn test_initiate_accept_stream() { + let (lhs, rhs) = tokio::io::duplex(65536); + let initiator = tokio::spawn(async move { + let mut stream = initiate_stream( + tokio::io::BufStream::new(lhs), + "jabber:client", + StreamHeader { + from: Some("client".into()), + to: Some("server".into()), + id: Some("client-id".into()), + }, + ) + .await?; + Ok::<_, io::Error>(stream.take_header()) + }); + let responder = tokio::spawn(async move { + let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?; + assert_eq!(stream.header().from.unwrap(), "client"); + assert_eq!(stream.header().to.unwrap(), "server"); + assert_eq!(stream.header().id.unwrap(), "client-id"); + stream + .send_header(StreamHeader { + from: Some("server".into()), + to: Some("client".into()), + id: Some("server-id".into()), + }) + .await + }); + responder.await.unwrap().expect("responder"); + let server_header = initiator.await.unwrap().expect("initiator"); + assert_eq!(server_header.from.unwrap(), "server"); + assert_eq!(server_header.to.unwrap(), "client"); + assert_eq!(server_header.id.unwrap(), "server-id"); +} + +#[tokio::test] +async fn test_exchange_stream_features() { + let (lhs, rhs) = tokio::io::duplex(65536); + let initiator = tokio::spawn(async move { + let stream = initiate_stream( + tokio::io::BufStream::new(lhs), + "jabber:client", + StreamHeader::default(), + ) + .await?; + let (features, _) = stream.recv_features::().await?; + Ok::<_, io::Error>(features) + }); + let responder = tokio::spawn(async move { + let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?; + let stream = stream.send_header(StreamHeader::default()).await?; + stream + .send_features::(&StreamFeatures::default()) + .await?; + Ok::<_, io::Error>(()) + }); + responder.await.unwrap().expect("responder failed"); + let features = initiator.await.unwrap().expect("initiator failed"); + assert_eq!(features, StreamFeatures::default()); +} + +#[tokio::test] +async fn test_exchange_data() { + let (lhs, rhs) = tokio::io::duplex(65536); + + let initiator = tokio::spawn(async move { + let stream = initiate_stream( + tokio::io::BufStream::new(lhs), + "jabber:client", + StreamHeader::default(), + ) + .await?; + let (_, mut stream) = stream.recv_features::().await?; + stream + .send(&Data { + contents: "hello".to_owned(), + }) + .await?; + match stream.next().await { + Some(Ok(Data { contents })) => assert_eq!(contents, "world!"), + other => panic!("unexpected stream message: {:?}", other), + } + Ok::<_, io::Error>(()) + }); + + let responder = tokio::spawn(async move { + let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?; + let stream = stream.send_header(StreamHeader::default()).await?; + let mut stream = stream + .send_features::(&StreamFeatures::default()) + .await?; + stream + .send(&Data { + contents: "world!".to_owned(), + }) + .await?; + match stream.next().await { + Some(Ok(Data { contents })) => assert_eq!(contents, "hello"), + other => panic!("unexpected stream message: {:?}", other), + } + Ok::<_, io::Error>(()) + }); + + responder.await.unwrap().expect("responder failed"); + initiator.await.unwrap().expect("initiator failed"); +} + +#[tokio::test] +async fn test_clean_shutdown() { + let (lhs, rhs) = tokio::io::duplex(65536); + + let initiator = tokio::spawn(async move { + let stream = initiate_stream( + tokio::io::BufStream::new(lhs), + "jabber:client", + StreamHeader::default(), + ) + .await?; + let (_, mut stream) = stream.recv_features::().await?; + stream.close().await?; + match stream.next().await { + Some(Err(ReadError::StreamFooterReceived)) => (), + other => panic!("unexpected stream message: {:?}", other), + } + Ok::<_, io::Error>(()) + }); + + let responder = tokio::spawn(async move { + let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?; + let stream = stream.send_header(StreamHeader::default()).await?; + let mut stream = stream + .send_features::(&StreamFeatures::default()) + .await?; + match stream.next().await { + Some(Err(ReadError::StreamFooterReceived)) => (), + other => panic!("unexpected stream message: {:?}", other), + } + stream.close().await?; + Ok::<_, io::Error>(()) + }); + + responder.await.unwrap().expect("responder failed"); + initiator.await.unwrap().expect("initiator failed"); +} + +#[tokio::test] +async fn test_exchange_data_stream_reset_and_shutdown() { + let (lhs, rhs) = tokio::io::duplex(65536); + + let initiator = tokio::spawn(async move { + let stream = initiate_stream( + tokio::io::BufStream::new(lhs), + "jabber:client", + StreamHeader::default(), + ) + .await?; + let (_, mut stream) = stream.recv_features::().await?; + stream + .send(&Data { + contents: "hello".to_owned(), + }) + .await?; + match stream.next().await { + Some(Ok(Data { contents })) => assert_eq!(contents, "world!"), + other => panic!("unexpected stream message: {:?}", other), + } + let stream = stream + .initiate_reset(StreamHeader { + from: Some("client".into()), + to: Some("server".into()), + id: Some("client-id".into()), + }) + .await?; + assert_eq!(stream.header().from.unwrap(), "server"); + assert_eq!(stream.header().to.unwrap(), "client"); + assert_eq!(stream.header().id.unwrap(), "server-id"); + + let (_, mut stream) = stream.recv_features::().await?; + stream + .send(&Data { + contents: "once more".to_owned(), + }) + .await?; + stream.close().await?; + match stream.next().await { + Some(Ok(Data { contents })) => assert_eq!(contents, "hello world!"), + other => panic!("unexpected stream message: {:?}", other), + } + match stream.next().await { + Some(Err(ReadError::StreamFooterReceived)) => (), + other => panic!("unexpected stream message: {:?}", other), + } + Ok::<_, io::Error>(()) + }); + + let responder = tokio::spawn(async move { + let stream = accept_stream(tokio::io::BufStream::new(rhs), "jabber:client").await?; + let stream = stream.send_header(StreamHeader::default()).await?; + let mut stream = stream + .send_features::(&StreamFeatures::default()) + .await?; + stream + .send(&Data { + contents: "world!".to_owned(), + }) + .await?; + match stream.next().await { + Some(Ok(Data { contents })) => assert_eq!(contents, "hello"), + other => panic!("unexpected stream message: {:?}", other), + } + let stream = stream.accept_reset().await?; + assert_eq!(stream.header().from.unwrap(), "client"); + assert_eq!(stream.header().to.unwrap(), "server"); + assert_eq!(stream.header().id.unwrap(), "client-id"); + let stream = stream + .send_header(StreamHeader { + from: Some("server".into()), + to: Some("client".into()), + id: Some("server-id".into()), + }) + .await?; + let mut stream = stream + .send_features::(&StreamFeatures::default()) + .await?; + stream + .send(&Data { + contents: "hello world!".to_owned(), + }) + .await?; + match stream.next().await { + Some(Ok(Data { contents })) => assert_eq!(contents, "once more"), + other => panic!("unexpected stream message: {:?}", other), + } + stream.close().await?; + match stream.next().await { + Some(Err(ReadError::StreamFooterReceived)) => (), + other => panic!("unexpected stream message: {:?}", other), + } + Ok::<_, io::Error>(()) + }); + + responder.await.unwrap().expect("responder failed"); + initiator.await.unwrap().expect("initiator failed"); +}