From fde4c2b6400d7d7a912396b804bb468c5753ecd9 Mon Sep 17 00:00:00 2001 From: xmppftw Date: Tue, 6 Aug 2024 17:00:53 +0200 Subject: [PATCH] Move XmppCodec and XmppStream to proto module --- tokio-xmpp/ChangeLog | 2 + tokio-xmpp/examples/echo_server.rs | 2 +- tokio-xmpp/src/client/async_client.rs | 20 ++--- tokio-xmpp/src/client/auth.rs | 5 +- tokio-xmpp/src/client/bind.rs | 7 +- tokio-xmpp/src/client/connect.rs | 18 +++-- tokio-xmpp/src/component/auth.rs | 5 +- tokio-xmpp/src/component/connect.rs | 7 +- tokio-xmpp/src/component/mod.rs | 10 +-- tokio-xmpp/src/connect/mod.rs | 6 +- tokio-xmpp/src/connect/starttls.rs | 21 +++--- tokio-xmpp/src/connect/tcp.rs | 6 +- tokio-xmpp/src/lib.rs | 5 +- tokio-xmpp/src/proto/mod.rs | 8 ++ tokio-xmpp/src/{ => proto}/xmpp_codec.rs | 0 tokio-xmpp/src/{ => proto}/xmpp_stream.rs | 89 +++++++++++++++++++---- tokio-xmpp/src/stream_start.rs | 72 ------------------ 17 files changed, 135 insertions(+), 148 deletions(-) create mode 100644 tokio-xmpp/src/proto/mod.rs rename tokio-xmpp/src/{ => proto}/xmpp_codec.rs (100%) rename tokio-xmpp/src/{ => proto}/xmpp_stream.rs (54%) delete mode 100644 tokio-xmpp/src/stream_start.rs diff --git a/tokio-xmpp/ChangeLog b/tokio-xmpp/ChangeLog index e785c2406acf405b9278acd3678aff180feb38c2..e1824a298fd06a7038036e22e7d97e7d369e5a37 100644 --- a/tokio-xmpp/ChangeLog +++ b/tokio-xmpp/ChangeLog @@ -15,6 +15,8 @@ XXXX-YY-ZZ RELEASER - `AsyncClient::poll_next` properly closes stream with `Poll::Ready(None)` when disconnecting without auto reconnect (!436) - remove `tokio_xmpp::SimpleClient` because it was not widely used, and not well documented ; if you need it, please let us know and it will be reintegrated (!428) + - `XMPPStream` was renamed `XmppStream` and is now published as `tokio_xmpp::proto::XmppStream` (!428) + - `XmppCodec` was moved to proto module and is now published as `tokio_xmpp::proto::XmppCodec` (!428) Version 4.0.0: 2024-07-26 Maxime “pep” Buquet diff --git a/tokio-xmpp/examples/echo_server.rs b/tokio-xmpp/examples/echo_server.rs index c80362460773330ab347fa7be34b4340c690edc0..67664715888ab8b269bf708f616bf40c7f73d348 100644 --- a/tokio-xmpp/examples/echo_server.rs +++ b/tokio-xmpp/examples/echo_server.rs @@ -2,7 +2,7 @@ use futures::{SinkExt, StreamExt}; use tokio::{self, io, net::TcpSocket}; use tokio_util::codec::Framed; -use tokio_xmpp::XmppCodec; +use tokio_xmpp::proto::XmppCodec; #[tokio::main] async fn main() -> Result<(), io::Error> { diff --git a/tokio-xmpp/src/client/async_client.rs b/tokio-xmpp/src/client/async_client.rs index cfd5a26027986b6480e6b6bacb9df668e85243ba..5b74ec74b1c9558c101b5082793321890ee2e0d4 100644 --- a/tokio-xmpp/src/client/async_client.rs +++ b/tokio-xmpp/src/client/async_client.rs @@ -6,14 +6,16 @@ use std::task::Context; use tokio::task::JoinHandle; use xmpp_parsers::{jid::Jid, ns, stream_features::StreamFeatures}; -use super::connect::client_login; +use crate::{ + client::connect::client_login, + connect::{AsyncReadAndWrite, ServerConnector}, + error::{Error, ProtocolError}, + proto::{add_stanza_id, Packet, XmppStream}, + Event, +}; + #[cfg(feature = "starttls")] use crate::connect::starttls::ServerConfig; -use crate::connect::{AsyncReadAndWrite, ServerConnector}; -use crate::error::{Error, ProtocolError}; -use crate::event::Event; -use crate::xmpp_codec::Packet; -use crate::xmpp_stream::{add_stanza_id, XMPPStream}; #[cfg(feature = "starttls")] use crate::AsyncConfig; @@ -44,8 +46,8 @@ pub struct Config { enum ClientState { Invalid, Disconnected, - Connecting(JoinHandle, Error>>), - Connected(XMPPStream), + Connecting(JoinHandle, Error>>), + Connected(XmppStream), } #[cfg(feature = "starttls")] @@ -197,7 +199,7 @@ impl Stream for Client { // // This needs to be a loop in order to ignore packets we don’t care about, or those // we want to handle elsewhere. Returning something isn’t correct in those two - // cases because it would signal to tokio that the XMPPStream is also done, while + // cases because it would signal to tokio that the XmppStream is also done, while // there could be additional packets waiting for us. // // The proper solution is thus a loop which we exit once we have something to diff --git a/tokio-xmpp/src/client/auth.rs b/tokio-xmpp/src/client/auth.rs index 39292a495772ba66028e9fe108adfa4bfcab17d5..217d496f6eacf50cdbbce4b726e258a39c81e454 100644 --- a/tokio-xmpp/src/client/auth.rs +++ b/tokio-xmpp/src/client/auth.rs @@ -9,11 +9,10 @@ use tokio::io::{AsyncRead, AsyncWrite}; use xmpp_parsers::sasl::{Auth, Challenge, Failure, Mechanism as XMPPMechanism, Response, Success}; use crate::error::{AuthError, Error, ProtocolError}; -use crate::xmpp_codec::Packet; -use crate::xmpp_stream::XMPPStream; +use crate::proto::{Packet, XmppStream}; pub async fn auth( - mut stream: XMPPStream, + mut stream: XmppStream, creds: Credentials, ) -> Result { let local_mechs: Vec Box + Send>> = vec![ diff --git a/tokio-xmpp/src/client/bind.rs b/tokio-xmpp/src/client/bind.rs index de791a8fbacdf105d7300ff90df2859517023ccf..2f282ad43b821c4f2747a5edc6e8bc0da5ad72bc 100644 --- a/tokio-xmpp/src/client/bind.rs +++ b/tokio-xmpp/src/client/bind.rs @@ -4,14 +4,13 @@ use xmpp_parsers::bind::{BindQuery, BindResponse}; use xmpp_parsers::iq::{Iq, IqType}; use crate::error::{Error, ProtocolError}; -use crate::xmpp_codec::Packet; -use crate::xmpp_stream::XMPPStream; +use crate::proto::{Packet, XmppStream}; const BIND_REQ_ID: &str = "resource-bind"; pub async fn bind( - mut stream: XMPPStream, -) -> Result, Error> { + mut stream: XmppStream, +) -> Result, Error> { if stream.stream_features.can_bind() { let resource = stream .jid diff --git a/tokio-xmpp/src/client/connect.rs b/tokio-xmpp/src/client/connect.rs index 41db57e095fce1ec3dec67c305a2927ef47fc4cc..7e02e1000f5b3f1977c5bb212779e629c440d133 100644 --- a/tokio-xmpp/src/client/connect.rs +++ b/tokio-xmpp/src/client/connect.rs @@ -1,10 +1,12 @@ use sasl::common::Credentials; use xmpp_parsers::{jid::Jid, ns}; -use crate::client::auth::auth; -use crate::client::bind::bind; -use crate::connect::ServerConnector; -use crate::{xmpp_stream::XMPPStream, Error}; +use crate::{ + client::{auth::auth, bind::bind}, + connect::ServerConnector, + proto::XmppStream, + Error, +}; /// Log into an XMPP server as a client with a jid+pass /// does channel binding if supported @@ -12,7 +14,7 @@ pub async fn client_login( server: C, jid: Jid, password: String, -) -> Result, Error> { +) -> Result, Error> { let username = jid.node().unwrap().as_str(); let password = password; @@ -26,10 +28,10 @@ pub async fn client_login( .with_channel_binding(channel_binding); // Authenticated (unspecified) stream let stream = auth(xmpp_stream, creds).await?; - // Authenticated XMPPStream - let xmpp_stream = XMPPStream::start(stream, jid, ns::JABBER_CLIENT.to_owned()).await?; + // Authenticated XmppStream + let xmpp_stream = XmppStream::start(stream, jid, ns::JABBER_CLIENT.to_owned()).await?; - // XMPPStream bound to user session + // XmppStream bound to user session let xmpp_stream = bind(xmpp_stream).await?; Ok(xmpp_stream) } diff --git a/tokio-xmpp/src/component/auth.rs b/tokio-xmpp/src/component/auth.rs index 370c77a18e32d129c806df5339b4a40ac4b30663..0bb10b01891ca7ff0f0d42b4750b166ee66d8c01 100644 --- a/tokio-xmpp/src/component/auth.rs +++ b/tokio-xmpp/src/component/auth.rs @@ -3,11 +3,10 @@ use tokio::io::{AsyncRead, AsyncWrite}; use xmpp_parsers::{component::Handshake, ns}; use crate::error::{AuthError, Error}; -use crate::xmpp_codec::Packet; -use crate::xmpp_stream::XMPPStream; +use crate::proto::{Packet, XmppStream}; pub async fn auth( - stream: &mut XMPPStream, + stream: &mut XmppStream, password: String, ) -> Result<(), Error> { let nonza = Handshake::from_password_and_stream_id(&password, &stream.id); diff --git a/tokio-xmpp/src/component/connect.rs b/tokio-xmpp/src/component/connect.rs index 63e66d07d28de5c28578e22a88e6347c7a49e26d..a7fb16241073baf2c0ecfbdd999bf66bf5b80919 100644 --- a/tokio-xmpp/src/component/connect.rs +++ b/tokio-xmpp/src/component/connect.rs @@ -1,16 +1,13 @@ use xmpp_parsers::{jid::Jid, ns}; -use crate::connect::ServerConnector; -use crate::{xmpp_stream::XMPPStream, Error}; - -use super::auth::auth; +use crate::{component::auth::auth, connect::ServerConnector, proto::XmppStream, Error}; /// Log into an XMPP server as a client with a jid+pass pub async fn component_login( connector: C, jid: Jid, password: String, -) -> Result, Error> { +) -> Result, Error> { let password = password; let mut xmpp_stream = connector.connect(&jid, ns::COMPONENT).await?; auth(&mut xmpp_stream, password).await?; diff --git a/tokio-xmpp/src/component/mod.rs b/tokio-xmpp/src/component/mod.rs index 17b1b38e0993d59b992cf1bb23386c4e87360c2d..0a51362d0edba173e066b8ff64b8f0464969caa5 100644 --- a/tokio-xmpp/src/component/mod.rs +++ b/tokio-xmpp/src/component/mod.rs @@ -10,11 +10,9 @@ use xmpp_parsers::{jid::Jid, ns}; use self::connect::component_login; -use super::xmpp_codec::Packet; -use super::Error; use crate::connect::ServerConnector; -use crate::xmpp_stream::add_stanza_id; -use crate::xmpp_stream::XMPPStream; +use crate::proto::{add_stanza_id, Packet, XmppStream}; +use crate::Error; mod auth; @@ -22,12 +20,12 @@ pub(crate) mod connect; /// Component connection to an XMPP server /// -/// This simplifies the `XMPPStream` to a `Stream`/`Sink` of `Element` +/// This simplifies the `XmppStream` to a `Stream`/`Sink` of `Element` /// (stanzas). Connection handling however is up to the user. pub struct Component { /// The component's Jabber-Id pub jid: Jid, - stream: XMPPStream, + stream: XmppStream, } impl Component { diff --git a/tokio-xmpp/src/connect/mod.rs b/tokio-xmpp/src/connect/mod.rs index 44d1921235f46849c8709ea3b9f751791c90f2af..30a2d85025efa81e54055e92763cf18e7079357b 100644 --- a/tokio-xmpp/src/connect/mod.rs +++ b/tokio-xmpp/src/connect/mod.rs @@ -14,7 +14,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use xmpp_parsers::jid::Jid; -use crate::xmpp_stream::XMPPStream; +use crate::proto::XmppStream; use crate::Error; #[cfg(feature = "starttls")] @@ -22,7 +22,7 @@ pub mod starttls; #[cfg(feature = "insecure-tcp")] pub mod tcp; -/// trait returned wrapped in XMPPStream by ServerConnector +/// trait returned wrapped in XmppStream by ServerConnector pub trait AsyncReadAndWrite: AsyncRead + AsyncWrite + Unpin + Send {} impl AsyncReadAndWrite for T {} @@ -38,7 +38,7 @@ pub trait ServerConnector: Clone + core::fmt::Debug + Send + Unpin + 'static { &self, jid: &Jid, ns: &str, - ) -> impl std::future::Future, Error>> + Send; + ) -> impl std::future::Future, Error>> + Send; /// Return channel binding data if available /// do not fail if channel binding is simply unavailable, just return Ok(None) diff --git a/tokio-xmpp/src/connect/starttls.rs b/tokio-xmpp/src/connect/starttls.rs index fd27852ddbe44debb7d7ea9446f6c8f7f057754d..7510fc089b82de61ab6568b24e5b08a898950283 100644 --- a/tokio-xmpp/src/connect/starttls.rs +++ b/tokio-xmpp/src/connect/starttls.rs @@ -39,8 +39,7 @@ use xmpp_parsers::{jid::Jid, ns}; use crate::{ connect::{ServerConnector, ServerConnectorError, Tcp}, error::{Error, ProtocolError}, - xmpp_codec::Packet, - xmpp_stream::XMPPStream, + proto::{Packet, XmppStream}, AsyncClient, }; @@ -64,7 +63,7 @@ pub enum ServerConfig { impl ServerConnector for ServerConfig { type Stream = TlsStream; - async fn connect(&self, jid: &Jid, ns: &str) -> Result, Error> { + async fn connect(&self, jid: &Jid, ns: &str) -> Result, Error> { // TCP connection let tcp_stream = match self { ServerConfig::UseSrv => { @@ -73,14 +72,14 @@ impl ServerConnector for ServerConfig { ServerConfig::Manual { host, port } => Tcp::resolve(host.as_str(), *port).await?, }; - // Unencryped XMPPStream - let xmpp_stream = XMPPStream::start(tcp_stream, jid.clone(), ns.to_owned()).await?; + // Unencryped XmppStream + let xmpp_stream = XmppStream::start(tcp_stream, jid.clone(), ns.to_owned()).await?; if xmpp_stream.stream_features.can_starttls() { // TlsStream let tls_stream = starttls(xmpp_stream).await?; - // Encrypted XMPPStream - Ok(XMPPStream::start(tls_stream, jid.clone(), ns.to_owned()).await?) + // Encrypted XmppStream + Ok(XmppStream::start(tls_stream, jid.clone(), ns.to_owned()).await?) } else { return Err(crate::Error::Protocol(ProtocolError::NoTls).into()); } @@ -114,7 +113,7 @@ impl ServerConnector for ServerConfig { #[cfg(feature = "tls-native")] async fn get_tls_stream( - xmpp_stream: XMPPStream, + xmpp_stream: XmppStream, ) -> Result, Error> { let domain = xmpp_stream.jid.domain().to_owned(); let stream = xmpp_stream.into_inner(); @@ -127,7 +126,7 @@ async fn get_tls_stream( #[cfg(all(feature = "tls-rust", not(feature = "tls-native")))] async fn get_tls_stream( - xmpp_stream: XMPPStream, + xmpp_stream: XmppStream, ) -> Result, Error> { let domain = xmpp_stream.jid.domain().to_string(); let domain = ServerName::try_from(domain).map_err(|e| StartTlsError::DnsNameError(e))?; @@ -145,10 +144,10 @@ async fn get_tls_stream( Ok(tls_stream) } -/// Performs `` on an XMPPStream and returns a binary +/// Performs `` on an XmppStream and returns a binary /// TlsStream. pub async fn starttls( - mut xmpp_stream: XMPPStream, + mut xmpp_stream: XmppStream, ) -> Result, Error> { let nonza = Element::builder("starttls", ns::TLS).build(); let packet = Packet::Stanza(nonza); diff --git a/tokio-xmpp/src/connect/tcp.rs b/tokio-xmpp/src/connect/tcp.rs index b2c4c216adbfdcd424fac1ca8f198e4da1813af3..5144d16016519adb5f5ceac2d3f427deeb2029dc 100644 --- a/tokio-xmpp/src/connect/tcp.rs +++ b/tokio-xmpp/src/connect/tcp.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use tokio::net::TcpStream; -use crate::{connect::ServerConnector, xmpp_stream::XMPPStream, Component, Error}; +use crate::{connect::ServerConnector, proto::XmppStream, Component, Error}; /// Component that connects over TCP pub type TcpComponent = Component; @@ -28,11 +28,11 @@ impl ServerConnector for TcpServerConnector { &self, jid: &xmpp_parsers::jid::Jid, ns: &str, - ) -> Result, Error> { + ) -> Result, Error> { let stream = TcpStream::connect(&*self.0) .await .map_err(|e| crate::Error::Io(e))?; - Ok(XMPPStream::start(stream, jid.clone(), ns.to_owned()).await?) + Ok(XmppStream::start(stream, jid.clone(), ns.to_owned()).await?) } } diff --git a/tokio-xmpp/src/lib.rs b/tokio-xmpp/src/lib.rs index f935294aa5babcb5a119122c163ea38d7dee4d6d..205d0f0000569480932dd56fddc196471fb4eaa5 100644 --- a/tokio-xmpp/src/lib.rs +++ b/tokio-xmpp/src/lib.rs @@ -20,14 +20,11 @@ compile_error!( "when starttls feature enabled one of tls-native and tls-rust features must be enabled." ); -mod stream_start; -mod xmpp_codec; -pub use crate::xmpp_codec::{Packet, XmppCodec}; mod event; pub use event::Event; mod client; pub mod connect; -pub mod xmpp_stream; +pub mod proto; pub use client::async_client::{Client as AsyncClient, Config as AsyncConfig}; mod component; diff --git a/tokio-xmpp/src/proto/mod.rs b/tokio-xmpp/src/proto/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..34de560a3fa423cc84c9da623c7751b9bac5a5a4 --- /dev/null +++ b/tokio-xmpp/src/proto/mod.rs @@ -0,0 +1,8 @@ +//! Low-level stream establishment + +mod xmpp_codec; +mod xmpp_stream; + +pub use xmpp_codec::{Packet, XmppCodec}; +pub(crate) use xmpp_stream::add_stanza_id; +pub use xmpp_stream::XmppStream; diff --git a/tokio-xmpp/src/xmpp_codec.rs b/tokio-xmpp/src/proto/xmpp_codec.rs similarity index 100% rename from tokio-xmpp/src/xmpp_codec.rs rename to tokio-xmpp/src/proto/xmpp_codec.rs diff --git a/tokio-xmpp/src/xmpp_stream.rs b/tokio-xmpp/src/proto/xmpp_stream.rs similarity index 54% rename from tokio-xmpp/src/xmpp_stream.rs rename to tokio-xmpp/src/proto/xmpp_stream.rs index a77ff9f1c147132cebbbb2c60052440ada0902cf..2624898d45ade8bef2ac64a4faa7d817204710af 100644 --- a/tokio-xmpp/src/xmpp_stream.rs +++ b/tokio-xmpp/src/proto/xmpp_stream.rs @@ -1,18 +1,21 @@ -//! `XMPPStream` provides encoding/decoding for XMPP - -use futures::sink::Send; -use futures::{sink::SinkExt, task::Poll, Sink, Stream}; +//! `XmppStream` provides encoding/decoding for XMPP + +use futures::{ + sink::{Send, SinkExt}, + stream::StreamExt, + task::Poll, + Sink, Stream, +}; use minidom::Element; use rand::{thread_rng, Rng}; use std::pin::Pin; use std::task::Context; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; -use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures}; +use xmpp_parsers::{jid::Jid, ns, stream_features::StreamFeatures, Error as ParsersError}; -use crate::stream_start; -use crate::xmpp_codec::{Packet, XmppCodec}; -use crate::Error; +use crate::error::{Error, ProtocolError}; +use crate::proto::{Packet, XmppCodec}; fn make_id() -> String { let id: u64 = thread_rng().gen(); @@ -36,7 +39,7 @@ pub(crate) fn add_stanza_id(mut stanza: Element, default_ns: &str) -> Element { /// and encode XMPP packets. /// /// Implements `Sink + Stream` -pub struct XMPPStream { +pub struct XmppStream { /// The local Jabber-Id pub jid: Jid, /// Codec instance @@ -52,7 +55,7 @@ pub struct XMPPStream { pub id: String, } -impl XMPPStream { +impl XmppStream { /// Constructor pub fn new( jid: Jid, @@ -61,7 +64,7 @@ impl XMPPStream { id: String, stream_features: StreamFeatures, ) -> Self { - XMPPStream { + XmppStream { jid, stream, stream_features, @@ -72,8 +75,62 @@ impl XMPPStream { /// Send a `` start tag pub async fn start(stream: S, jid: Jid, ns: String) -> Result { - let xmpp_stream = Framed::new(stream, XmppCodec::new()); - stream_start::start(xmpp_stream, jid, ns).await + let mut stream = Framed::new(stream, XmppCodec::new()); + let attrs = [ + ("to".to_owned(), jid.domain().to_string()), + ("version".to_owned(), "1.0".to_owned()), + ("xmlns".to_owned(), ns.clone()), + ("xmlns:stream".to_owned(), ns::STREAM.to_owned()), + ] + .iter() + .cloned() + .collect(); + stream.send(Packet::StreamStart(attrs)).await?; + + let stream_attrs; + loop { + match stream.next().await { + Some(Ok(Packet::StreamStart(attrs))) => { + stream_attrs = attrs; + break; + } + Some(Ok(_)) => {} + Some(Err(e)) => return Err(e.into()), + None => return Err(Error::Disconnected), + } + } + + let stream_ns = stream_attrs + .get("xmlns") + .ok_or(ProtocolError::NoStreamNamespace)? + .clone(); + let stream_id = stream_attrs + .get("id") + .ok_or(ProtocolError::NoStreamId)? + .clone(); + if stream_ns == "jabber:client" && stream_attrs.get("version").is_some() { + loop { + match stream.next().await { + Some(Ok(Packet::Stanza(stanza))) => { + let stream_features = StreamFeatures::try_from(stanza) + .map_err(|e| Error::Protocol(ParsersError::from(e).into()))?; + return Ok(XmppStream::new(jid, stream, ns, stream_id, stream_features)); + } + Some(Ok(_)) => {} + Some(Err(e)) => return Err(e.into()), + None => return Err(Error::Disconnected), + } + } + } else { + // FIXME: huge hack, shouldn’t be an element! + return Ok(XmppStream::new( + jid, + stream, + ns, + stream_id.clone(), + StreamFeatures::default(), + )); + } } /// Unwraps the inner stream @@ -88,7 +145,7 @@ impl XMPPStream { } } -impl XMPPStream { +impl XmppStream { /// Convenience method pub fn send_stanza>(&mut self, e: E) -> Send { self.send(Packet::Stanza(e.into())) @@ -96,7 +153,7 @@ impl XMPPStream { } /// Proxy to self.stream -impl Sink for XMPPStream { +impl Sink for XmppStream { type Error = crate::Error; fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll> { @@ -125,7 +182,7 @@ impl Sink for XMPPStream { } /// Proxy to self.stream -impl Stream for XMPPStream { +impl Stream for XmppStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { diff --git a/tokio-xmpp/src/stream_start.rs b/tokio-xmpp/src/stream_start.rs deleted file mode 100644 index b11349fd2cd900a28c44fc55263f22b6aecda26b..0000000000000000000000000000000000000000 --- a/tokio-xmpp/src/stream_start.rs +++ /dev/null @@ -1,72 +0,0 @@ -use futures::{sink::SinkExt, stream::StreamExt}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::codec::Framed; -use xmpp_parsers::{jid::Jid, ns, stream_features::StreamFeatures, Error as ParsersError}; - -use crate::error::{Error, ProtocolError}; -use crate::xmpp_codec::{Packet, XmppCodec}; -use crate::xmpp_stream::XMPPStream; - -/// Sends a ``, then wait for one from the server, and -/// construct an XMPPStream. -pub async fn start( - mut stream: Framed, - jid: Jid, - ns: String, -) -> Result, Error> { - let attrs = [ - ("to".to_owned(), jid.domain().to_string()), - ("version".to_owned(), "1.0".to_owned()), - ("xmlns".to_owned(), ns.clone()), - ("xmlns:stream".to_owned(), ns::STREAM.to_owned()), - ] - .iter() - .cloned() - .collect(); - stream.send(Packet::StreamStart(attrs)).await?; - - let stream_attrs; - loop { - match stream.next().await { - Some(Ok(Packet::StreamStart(attrs))) => { - stream_attrs = attrs; - break; - } - Some(Ok(_)) => {} - Some(Err(e)) => return Err(e.into()), - None => return Err(Error::Disconnected), - } - } - - let stream_ns = stream_attrs - .get("xmlns") - .ok_or(ProtocolError::NoStreamNamespace)? - .clone(); - let stream_id = stream_attrs - .get("id") - .ok_or(ProtocolError::NoStreamId)? - .clone(); - if stream_ns == "jabber:client" && stream_attrs.get("version").is_some() { - loop { - match stream.next().await { - Some(Ok(Packet::Stanza(stanza))) => { - let stream_features = StreamFeatures::try_from(stanza) - .map_err(|e| Error::Protocol(ParsersError::from(e).into()))?; - return Ok(XMPPStream::new(jid, stream, ns, stream_id, stream_features)); - } - Some(Ok(_)) => {} - Some(Err(e)) => return Err(e.into()), - None => return Err(Error::Disconnected), - } - } - } else { - // FIXME: huge hack, shouldn’t be an element! - return Ok(XMPPStream::new( - jid, - stream, - ns, - stream_id.clone(), - StreamFeatures::default(), - )); - } -}