Detailed changes
@@ -15,6 +15,8 @@ XXXX-YY-ZZ RELEASER <admin@example.com>
- `AsyncClient::poll_next` properly closes stream with `Poll::Ready(None)` when disconnecting without auto reconnect (!436)
- remove `tokio_xmpp::SimpleClient` because it was not widely used, and not well documented ; if you need it,
please let us know and it will be reintegrated (!428)
+ - `XMPPStream` was renamed `XmppStream` and is now published as `tokio_xmpp::proto::XmppStream` (!428)
+ - `XmppCodec` was moved to proto module and is now published as `tokio_xmpp::proto::XmppCodec` (!428)
Version 4.0.0:
2024-07-26 Maxime βpepβ Buquet <pep@bouah.net>
@@ -2,7 +2,7 @@ use futures::{SinkExt, StreamExt};
use tokio::{self, io, net::TcpSocket};
use tokio_util::codec::Framed;
-use tokio_xmpp::XmppCodec;
+use tokio_xmpp::proto::XmppCodec;
#[tokio::main]
async fn main() -> Result<(), io::Error> {
@@ -6,14 +6,16 @@ use std::task::Context;
use tokio::task::JoinHandle;
use xmpp_parsers::{jid::Jid, ns, stream_features::StreamFeatures};
-use super::connect::client_login;
+use crate::{
+ client::connect::client_login,
+ connect::{AsyncReadAndWrite, ServerConnector},
+ error::{Error, ProtocolError},
+ proto::{add_stanza_id, Packet, XmppStream},
+ Event,
+};
+
#[cfg(feature = "starttls")]
use crate::connect::starttls::ServerConfig;
-use crate::connect::{AsyncReadAndWrite, ServerConnector};
-use crate::error::{Error, ProtocolError};
-use crate::event::Event;
-use crate::xmpp_codec::Packet;
-use crate::xmpp_stream::{add_stanza_id, XMPPStream};
#[cfg(feature = "starttls")]
use crate::AsyncConfig;
@@ -44,8 +46,8 @@ pub struct Config<C> {
enum ClientState<S: AsyncReadAndWrite> {
Invalid,
Disconnected,
- Connecting(JoinHandle<Result<XMPPStream<S>, Error>>),
- Connected(XMPPStream<S>),
+ Connecting(JoinHandle<Result<XmppStream<S>, Error>>),
+ Connected(XmppStream<S>),
}
#[cfg(feature = "starttls")]
@@ -197,7 +199,7 @@ impl<C: ServerConnector> Stream for Client<C> {
//
// This needs to be a loop in order to ignore packets we donβt care about, or those
// we want to handle elsewhere. Returning something isnβt correct in those two
- // cases because it would signal to tokio that the XMPPStream is also done, while
+ // cases because it would signal to tokio that the XmppStream is also done, while
// there could be additional packets waiting for us.
//
// The proper solution is thus a loop which we exit once we have something to
@@ -9,11 +9,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
use xmpp_parsers::sasl::{Auth, Challenge, Failure, Mechanism as XMPPMechanism, Response, Success};
use crate::error::{AuthError, Error, ProtocolError};
-use crate::xmpp_codec::Packet;
-use crate::xmpp_stream::XMPPStream;
+use crate::proto::{Packet, XmppStream};
pub async fn auth<S: AsyncRead + AsyncWrite + Unpin>(
- mut stream: XMPPStream<S>,
+ mut stream: XmppStream<S>,
creds: Credentials,
) -> Result<S, Error> {
let local_mechs: Vec<Box<dyn Fn() -> Box<dyn Mechanism + Send + Sync> + Send>> = vec![
@@ -4,14 +4,13 @@ use xmpp_parsers::bind::{BindQuery, BindResponse};
use xmpp_parsers::iq::{Iq, IqType};
use crate::error::{Error, ProtocolError};
-use crate::xmpp_codec::Packet;
-use crate::xmpp_stream::XMPPStream;
+use crate::proto::{Packet, XmppStream};
const BIND_REQ_ID: &str = "resource-bind";
pub async fn bind<S: AsyncRead + AsyncWrite + Unpin>(
- mut stream: XMPPStream<S>,
-) -> Result<XMPPStream<S>, Error> {
+ mut stream: XmppStream<S>,
+) -> Result<XmppStream<S>, Error> {
if stream.stream_features.can_bind() {
let resource = stream
.jid
@@ -1,10 +1,12 @@
use sasl::common::Credentials;
use xmpp_parsers::{jid::Jid, ns};
-use crate::client::auth::auth;
-use crate::client::bind::bind;
-use crate::connect::ServerConnector;
-use crate::{xmpp_stream::XMPPStream, Error};
+use crate::{
+ client::{auth::auth, bind::bind},
+ connect::ServerConnector,
+ proto::XmppStream,
+ Error,
+};
/// Log into an XMPP server as a client with a jid+pass
/// does channel binding if supported
@@ -12,7 +14,7 @@ pub async fn client_login<C: ServerConnector>(
server: C,
jid: Jid,
password: String,
-) -> Result<XMPPStream<C::Stream>, Error> {
+) -> Result<XmppStream<C::Stream>, Error> {
let username = jid.node().unwrap().as_str();
let password = password;
@@ -26,10 +28,10 @@ pub async fn client_login<C: ServerConnector>(
.with_channel_binding(channel_binding);
// Authenticated (unspecified) stream
let stream = auth(xmpp_stream, creds).await?;
- // Authenticated XMPPStream
- let xmpp_stream = XMPPStream::start(stream, jid, ns::JABBER_CLIENT.to_owned()).await?;
+ // Authenticated XmppStream
+ let xmpp_stream = XmppStream::start(stream, jid, ns::JABBER_CLIENT.to_owned()).await?;
- // XMPPStream bound to user session
+ // XmppStream bound to user session
let xmpp_stream = bind(xmpp_stream).await?;
Ok(xmpp_stream)
}
@@ -3,11 +3,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
use xmpp_parsers::{component::Handshake, ns};
use crate::error::{AuthError, Error};
-use crate::xmpp_codec::Packet;
-use crate::xmpp_stream::XMPPStream;
+use crate::proto::{Packet, XmppStream};
pub async fn auth<S: AsyncRead + AsyncWrite + Unpin>(
- stream: &mut XMPPStream<S>,
+ stream: &mut XmppStream<S>,
password: String,
) -> Result<(), Error> {
let nonza = Handshake::from_password_and_stream_id(&password, &stream.id);
@@ -1,16 +1,13 @@
use xmpp_parsers::{jid::Jid, ns};
-use crate::connect::ServerConnector;
-use crate::{xmpp_stream::XMPPStream, Error};
-
-use super::auth::auth;
+use crate::{component::auth::auth, connect::ServerConnector, proto::XmppStream, Error};
/// Log into an XMPP server as a client with a jid+pass
pub async fn component_login<C: ServerConnector>(
connector: C,
jid: Jid,
password: String,
-) -> Result<XMPPStream<C::Stream>, Error> {
+) -> Result<XmppStream<C::Stream>, Error> {
let password = password;
let mut xmpp_stream = connector.connect(&jid, ns::COMPONENT).await?;
auth(&mut xmpp_stream, password).await?;
@@ -10,11 +10,9 @@ use xmpp_parsers::{jid::Jid, ns};
use self::connect::component_login;
-use super::xmpp_codec::Packet;
-use super::Error;
use crate::connect::ServerConnector;
-use crate::xmpp_stream::add_stanza_id;
-use crate::xmpp_stream::XMPPStream;
+use crate::proto::{add_stanza_id, Packet, XmppStream};
+use crate::Error;
mod auth;
@@ -22,12 +20,12 @@ pub(crate) mod connect;
/// Component connection to an XMPP server
///
-/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element`
+/// This simplifies the `XmppStream` to a `Stream`/`Sink` of `Element`
/// (stanzas). Connection handling however is up to the user.
pub struct Component<C: ServerConnector> {
/// The component's Jabber-Id
pub jid: Jid,
- stream: XMPPStream<C::Stream>,
+ stream: XmppStream<C::Stream>,
}
impl<C: ServerConnector> Component<C> {
@@ -14,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use xmpp_parsers::jid::Jid;
-use crate::xmpp_stream::XMPPStream;
+use crate::proto::XmppStream;
use crate::Error;
#[cfg(feature = "starttls")]
@@ -22,7 +22,7 @@ pub mod starttls;
#[cfg(feature = "insecure-tcp")]
pub mod tcp;
-/// trait returned wrapped in XMPPStream by ServerConnector
+/// trait returned wrapped in XmppStream by ServerConnector
pub trait AsyncReadAndWrite: AsyncRead + AsyncWrite + Unpin + Send {}
impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsyncReadAndWrite for T {}
@@ -38,7 +38,7 @@ pub trait ServerConnector: Clone + core::fmt::Debug + Send + Unpin + 'static {
&self,
jid: &Jid,
ns: &str,
- ) -> impl std::future::Future<Output = Result<XMPPStream<Self::Stream>, Error>> + Send;
+ ) -> impl std::future::Future<Output = Result<XmppStream<Self::Stream>, Error>> + Send;
/// Return channel binding data if available
/// do not fail if channel binding is simply unavailable, just return Ok(None)
@@ -39,8 +39,7 @@ use xmpp_parsers::{jid::Jid, ns};
use crate::{
connect::{ServerConnector, ServerConnectorError, Tcp},
error::{Error, ProtocolError},
- xmpp_codec::Packet,
- xmpp_stream::XMPPStream,
+ proto::{Packet, XmppStream},
AsyncClient,
};
@@ -64,7 +63,7 @@ pub enum ServerConfig {
impl ServerConnector for ServerConfig {
type Stream = TlsStream<TcpStream>;
- async fn connect(&self, jid: &Jid, ns: &str) -> Result<XMPPStream<Self::Stream>, Error> {
+ async fn connect(&self, jid: &Jid, ns: &str) -> Result<XmppStream<Self::Stream>, Error> {
// TCP connection
let tcp_stream = match self {
ServerConfig::UseSrv => {
@@ -73,14 +72,14 @@ impl ServerConnector for ServerConfig {
ServerConfig::Manual { host, port } => Tcp::resolve(host.as_str(), *port).await?,
};
- // Unencryped XMPPStream
- let xmpp_stream = XMPPStream::start(tcp_stream, jid.clone(), ns.to_owned()).await?;
+ // Unencryped XmppStream
+ let xmpp_stream = XmppStream::start(tcp_stream, jid.clone(), ns.to_owned()).await?;
if xmpp_stream.stream_features.can_starttls() {
// TlsStream
let tls_stream = starttls(xmpp_stream).await?;
- // Encrypted XMPPStream
- Ok(XMPPStream::start(tls_stream, jid.clone(), ns.to_owned()).await?)
+ // Encrypted XmppStream
+ Ok(XmppStream::start(tls_stream, jid.clone(), ns.to_owned()).await?)
} else {
return Err(crate::Error::Protocol(ProtocolError::NoTls).into());
}
@@ -114,7 +113,7 @@ impl ServerConnector for ServerConfig {
#[cfg(feature = "tls-native")]
async fn get_tls_stream<S: AsyncRead + AsyncWrite + Unpin>(
- xmpp_stream: XMPPStream<S>,
+ xmpp_stream: XmppStream<S>,
) -> Result<TlsStream<S>, Error> {
let domain = xmpp_stream.jid.domain().to_owned();
let stream = xmpp_stream.into_inner();
@@ -127,7 +126,7 @@ async fn get_tls_stream<S: AsyncRead + AsyncWrite + Unpin>(
#[cfg(all(feature = "tls-rust", not(feature = "tls-native")))]
async fn get_tls_stream<S: AsyncRead + AsyncWrite + Unpin>(
- xmpp_stream: XMPPStream<S>,
+ xmpp_stream: XmppStream<S>,
) -> Result<TlsStream<S>, Error> {
let domain = xmpp_stream.jid.domain().to_string();
let domain = ServerName::try_from(domain).map_err(|e| StartTlsError::DnsNameError(e))?;
@@ -145,10 +144,10 @@ async fn get_tls_stream<S: AsyncRead + AsyncWrite + Unpin>(
Ok(tls_stream)
}
-/// Performs `<starttls/>` on an XMPPStream and returns a binary
+/// Performs `<starttls/>` on an XmppStream and returns a binary
/// TlsStream.
pub async fn starttls<S: AsyncRead + AsyncWrite + Unpin>(
- mut xmpp_stream: XMPPStream<S>,
+ mut xmpp_stream: XmppStream<S>,
) -> Result<TlsStream<S>, Error> {
let nonza = Element::builder("starttls", ns::TLS).build();
let packet = Packet::Stanza(nonza);
@@ -4,7 +4,7 @@ use std::sync::Arc;
use tokio::net::TcpStream;
-use crate::{connect::ServerConnector, xmpp_stream::XMPPStream, Component, Error};
+use crate::{connect::ServerConnector, proto::XmppStream, Component, Error};
/// Component that connects over TCP
pub type TcpComponent = Component<TcpServerConnector>;
@@ -28,11 +28,11 @@ impl ServerConnector for TcpServerConnector {
&self,
jid: &xmpp_parsers::jid::Jid,
ns: &str,
- ) -> Result<XMPPStream<Self::Stream>, Error> {
+ ) -> Result<XmppStream<Self::Stream>, Error> {
let stream = TcpStream::connect(&*self.0)
.await
.map_err(|e| crate::Error::Io(e))?;
- Ok(XMPPStream::start(stream, jid.clone(), ns.to_owned()).await?)
+ Ok(XmppStream::start(stream, jid.clone(), ns.to_owned()).await?)
}
}
@@ -20,14 +20,11 @@ compile_error!(
"when starttls feature enabled one of tls-native and tls-rust features must be enabled."
);
-mod stream_start;
-mod xmpp_codec;
-pub use crate::xmpp_codec::{Packet, XmppCodec};
mod event;
pub use event::Event;
mod client;
pub mod connect;
-pub mod xmpp_stream;
+pub mod proto;
pub use client::async_client::{Client as AsyncClient, Config as AsyncConfig};
mod component;
@@ -0,0 +1,8 @@
+//! Low-level stream establishment
+
+mod xmpp_codec;
+mod xmpp_stream;
+
+pub use xmpp_codec::{Packet, XmppCodec};
+pub(crate) use xmpp_stream::add_stanza_id;
+pub use xmpp_stream::XmppStream;
@@ -1,18 +1,21 @@
-//! `XMPPStream` provides encoding/decoding for XMPP
-
-use futures::sink::Send;
-use futures::{sink::SinkExt, task::Poll, Sink, Stream};
+//! `XmppStream` provides encoding/decoding for XMPP
+
+use futures::{
+ sink::{Send, SinkExt},
+ stream::StreamExt,
+ task::Poll,
+ Sink, Stream,
+};
use minidom::Element;
use rand::{thread_rng, Rng};
use std::pin::Pin;
use std::task::Context;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
-use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures};
+use xmpp_parsers::{jid::Jid, ns, stream_features::StreamFeatures, Error as ParsersError};
-use crate::stream_start;
-use crate::xmpp_codec::{Packet, XmppCodec};
-use crate::Error;
+use crate::error::{Error, ProtocolError};
+use crate::proto::{Packet, XmppCodec};
fn make_id() -> String {
let id: u64 = thread_rng().gen();
@@ -36,7 +39,7 @@ pub(crate) fn add_stanza_id(mut stanza: Element, default_ns: &str) -> Element {
/// and encode XMPP packets.
///
/// Implements `Sink + Stream`
-pub struct XMPPStream<S: AsyncRead + AsyncWrite + Unpin> {
+pub struct XmppStream<S: AsyncRead + AsyncWrite + Unpin> {
/// The local Jabber-Id
pub jid: Jid,
/// Codec instance
@@ -52,7 +55,7 @@ pub struct XMPPStream<S: AsyncRead + AsyncWrite + Unpin> {
pub id: String,
}
-impl<S: AsyncRead + AsyncWrite + Unpin> XMPPStream<S> {
+impl<S: AsyncRead + AsyncWrite + Unpin> XmppStream<S> {
/// Constructor
pub fn new(
jid: Jid,
@@ -61,7 +64,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> XMPPStream<S> {
id: String,
stream_features: StreamFeatures,
) -> Self {
- XMPPStream {
+ XmppStream {
jid,
stream,
stream_features,
@@ -72,8 +75,62 @@ impl<S: AsyncRead + AsyncWrite + Unpin> XMPPStream<S> {
/// Send a `<stream:stream>` start tag
pub async fn start(stream: S, jid: Jid, ns: String) -> Result<Self, Error> {
- let xmpp_stream = Framed::new(stream, XmppCodec::new());
- stream_start::start(xmpp_stream, jid, ns).await
+ let mut stream = Framed::new(stream, XmppCodec::new());
+ let attrs = [
+ ("to".to_owned(), jid.domain().to_string()),
+ ("version".to_owned(), "1.0".to_owned()),
+ ("xmlns".to_owned(), ns.clone()),
+ ("xmlns:stream".to_owned(), ns::STREAM.to_owned()),
+ ]
+ .iter()
+ .cloned()
+ .collect();
+ stream.send(Packet::StreamStart(attrs)).await?;
+
+ let stream_attrs;
+ loop {
+ match stream.next().await {
+ Some(Ok(Packet::StreamStart(attrs))) => {
+ stream_attrs = attrs;
+ break;
+ }
+ Some(Ok(_)) => {}
+ Some(Err(e)) => return Err(e.into()),
+ None => return Err(Error::Disconnected),
+ }
+ }
+
+ let stream_ns = stream_attrs
+ .get("xmlns")
+ .ok_or(ProtocolError::NoStreamNamespace)?
+ .clone();
+ let stream_id = stream_attrs
+ .get("id")
+ .ok_or(ProtocolError::NoStreamId)?
+ .clone();
+ if stream_ns == "jabber:client" && stream_attrs.get("version").is_some() {
+ loop {
+ match stream.next().await {
+ Some(Ok(Packet::Stanza(stanza))) => {
+ let stream_features = StreamFeatures::try_from(stanza)
+ .map_err(|e| Error::Protocol(ParsersError::from(e).into()))?;
+ return Ok(XmppStream::new(jid, stream, ns, stream_id, stream_features));
+ }
+ Some(Ok(_)) => {}
+ Some(Err(e)) => return Err(e.into()),
+ None => return Err(Error::Disconnected),
+ }
+ }
+ } else {
+ // FIXME: huge hack, shouldnβt be an element!
+ return Ok(XmppStream::new(
+ jid,
+ stream,
+ ns,
+ stream_id.clone(),
+ StreamFeatures::default(),
+ ));
+ }
}
/// Unwraps the inner stream
@@ -88,7 +145,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> XMPPStream<S> {
}
}
-impl<S: AsyncRead + AsyncWrite + Unpin> XMPPStream<S> {
+impl<S: AsyncRead + AsyncWrite + Unpin> XmppStream<S> {
/// Convenience method
pub fn send_stanza<E: Into<Element>>(&mut self, e: E) -> Send<Self, Packet> {
self.send(Packet::Stanza(e.into()))
@@ -96,7 +153,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> XMPPStream<S> {
}
/// Proxy to self.stream
-impl<S: AsyncRead + AsyncWrite + Unpin> Sink<Packet> for XMPPStream<S> {
+impl<S: AsyncRead + AsyncWrite + Unpin> Sink<Packet> for XmppStream<S> {
type Error = crate::Error;
fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Result<(), Self::Error>> {
@@ -125,7 +182,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Sink<Packet> for XMPPStream<S> {
}
/// Proxy to self.stream
-impl<S: AsyncRead + AsyncWrite + Unpin> Stream for XMPPStream<S> {
+impl<S: AsyncRead + AsyncWrite + Unpin> Stream for XmppStream<S> {
type Item = Result<Packet, crate::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@@ -1,72 +0,0 @@
-use futures::{sink::SinkExt, stream::StreamExt};
-use tokio::io::{AsyncRead, AsyncWrite};
-use tokio_util::codec::Framed;
-use xmpp_parsers::{jid::Jid, ns, stream_features::StreamFeatures, Error as ParsersError};
-
-use crate::error::{Error, ProtocolError};
-use crate::xmpp_codec::{Packet, XmppCodec};
-use crate::xmpp_stream::XMPPStream;
-
-/// Sends a `<stream:stream>`, then wait for one from the server, and
-/// construct an XMPPStream.
-pub async fn start<S: AsyncRead + AsyncWrite + Unpin>(
- mut stream: Framed<S, XmppCodec>,
- jid: Jid,
- ns: String,
-) -> Result<XMPPStream<S>, Error> {
- let attrs = [
- ("to".to_owned(), jid.domain().to_string()),
- ("version".to_owned(), "1.0".to_owned()),
- ("xmlns".to_owned(), ns.clone()),
- ("xmlns:stream".to_owned(), ns::STREAM.to_owned()),
- ]
- .iter()
- .cloned()
- .collect();
- stream.send(Packet::StreamStart(attrs)).await?;
-
- let stream_attrs;
- loop {
- match stream.next().await {
- Some(Ok(Packet::StreamStart(attrs))) => {
- stream_attrs = attrs;
- break;
- }
- Some(Ok(_)) => {}
- Some(Err(e)) => return Err(e.into()),
- None => return Err(Error::Disconnected),
- }
- }
-
- let stream_ns = stream_attrs
- .get("xmlns")
- .ok_or(ProtocolError::NoStreamNamespace)?
- .clone();
- let stream_id = stream_attrs
- .get("id")
- .ok_or(ProtocolError::NoStreamId)?
- .clone();
- if stream_ns == "jabber:client" && stream_attrs.get("version").is_some() {
- loop {
- match stream.next().await {
- Some(Ok(Packet::Stanza(stanza))) => {
- let stream_features = StreamFeatures::try_from(stanza)
- .map_err(|e| Error::Protocol(ParsersError::from(e).into()))?;
- return Ok(XMPPStream::new(jid, stream, ns, stream_id, stream_features));
- }
- Some(Ok(_)) => {}
- Some(Err(e)) => return Err(e.into()),
- None => return Err(Error::Disconnected),
- }
- }
- } else {
- // FIXME: huge hack, shouldnβt be an element!
- return Ok(XMPPStream::new(
- jid,
- stream,
- ns,
- stream_id.clone(),
- StreamFeatures::default(),
- ));
- }
-}