simple_client.rs

  1use futures::{sink::SinkExt, Sink, Stream};
  2use idna;
  3use sasl::common::{ChannelBinding, Credentials};
  4use std::pin::Pin;
  5use std::str::FromStr;
  6use std::task::{Context, Poll};
  7use tokio::net::TcpStream;
  8#[cfg(feature = "tls-native")]
  9use tokio_native_tls::TlsStream;
 10#[cfg(feature = "tls-rust")]
 11use tokio_rustls::client::TlsStream;
 12use tokio_stream::StreamExt;
 13use xmpp_parsers::{ns, Element, Jid};
 14
 15use super::auth::auth;
 16use super::bind::bind;
 17use crate::happy_eyeballs::connect_with_srv;
 18use crate::starttls::starttls;
 19use crate::xmpp_codec::Packet;
 20use crate::xmpp_stream::{self, add_stanza_id};
 21use crate::{Error, ProtocolError};
 22
 23/// A simple XMPP client connection
 24///
 25/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
 26/// [`Sink`](#impl-Sink<Packet>) traits.
 27pub struct Client {
 28    stream: XMPPStream,
 29}
 30
 31type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
 32
 33impl Client {
 34    /// Start a new XMPP client and wait for a usable session
 35    pub async fn new<P: Into<String>>(jid: &str, password: P) -> Result<Self, Error> {
 36        let jid = Jid::from_str(jid)?;
 37        let client = Self::new_with_jid(jid, password.into()).await?;
 38        Ok(client)
 39    }
 40
 41    /// Start a new client given that the JID is already parsed.
 42    pub async fn new_with_jid(jid: Jid, password: String) -> Result<Self, Error> {
 43        let stream = Self::connect(jid.clone(), password.clone()).await?;
 44        Ok(Client { stream })
 45    }
 46
 47    /// Get direct access to inner XMPP Stream
 48    pub fn into_inner(self) -> XMPPStream {
 49        self.stream
 50    }
 51
 52    async fn connect(jid: Jid, password: String) -> Result<XMPPStream, Error> {
 53        let username = jid.clone().node().unwrap();
 54        let password = password;
 55        let domain = idna::domain_to_ascii(&jid.clone().domain()).map_err(|_| Error::Idna)?;
 56
 57        // TCP connection
 58        let tcp_stream = connect_with_srv(&domain, "_xmpp-client._tcp", 5222).await?;
 59
 60        // Unencryped XMPPStream
 61        let xmpp_stream =
 62            xmpp_stream::XMPPStream::start(tcp_stream, jid.clone(), ns::JABBER_CLIENT.to_owned())
 63                .await?;
 64
 65        let xmpp_stream = if xmpp_stream.stream_features.can_starttls() {
 66            // TlsStream
 67            let tls_stream = starttls(xmpp_stream).await?;
 68            // Encrypted XMPPStream
 69            xmpp_stream::XMPPStream::start(tls_stream, jid.clone(), ns::JABBER_CLIENT.to_owned())
 70                .await?
 71        } else {
 72            return Err(Error::Protocol(ProtocolError::NoTls));
 73        };
 74
 75        let creds = Credentials::default()
 76            .with_username(username)
 77            .with_password(password)
 78            .with_channel_binding(ChannelBinding::None);
 79        // Authenticated (unspecified) stream
 80        let stream = auth(xmpp_stream, creds).await?;
 81        // Authenticated XMPPStream
 82        let xmpp_stream =
 83            xmpp_stream::XMPPStream::start(stream, jid, ns::JABBER_CLIENT.to_owned()).await?;
 84
 85        // XMPPStream bound to user session
 86        let xmpp_stream = bind(xmpp_stream).await?;
 87        Ok(xmpp_stream)
 88    }
 89
 90    /// Get the client's bound JID (the one reported by the XMPP
 91    /// server).
 92    pub fn bound_jid(&self) -> &Jid {
 93        &self.stream.jid
 94    }
 95
 96    /// Send stanza
 97    pub async fn send_stanza<E>(&mut self, stanza: E) -> Result<(), Error>
 98    where
 99        E: Into<Element>,
100    {
101        self.send(Packet::Stanza(add_stanza_id(
102            stanza.into(),
103            ns::JABBER_CLIENT,
104        )))
105        .await
106    }
107
108    /// End connection by sending `</stream:stream>`
109    ///
110    /// You may expect the server to respond with the same. This
111    /// client will then drop its connection.
112    pub async fn end(mut self) -> Result<(), Error> {
113        self.send(Packet::StreamEnd).await?;
114
115        // Wait for stream end from server
116        while let Some(Ok(_)) = self.next().await {}
117
118        Ok(())
119    }
120}
121
122/// Incoming XMPP events
123///
124/// In an `async fn` you may want to use this with `use
125/// futures::stream::StreamExt;`
126impl Stream for Client {
127    type Item = Result<Element, Error>;
128
129    /// Low-level read on the XMPP stream
130    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
131        loop {
132            match Pin::new(&mut self.stream).poll_next(cx) {
133                Poll::Pending => return Poll::Pending,
134                Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
135                    return Poll::Ready(Some(Ok(stanza)))
136                }
137                Poll::Ready(Some(Ok(Packet::Text(_)))) => {
138                    // Ignore, retry
139                }
140                Poll::Ready(_) =>
141                // Unexpected and errors, just end
142                {
143                    return Poll::Ready(None)
144                }
145            }
146        }
147    }
148}
149
150/// Outgoing XMPP packets
151///
152/// See `send_stanza()` for an `async fn`
153impl Sink<Packet> for Client {
154    type Error = Error;
155
156    fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
157        Pin::new(&mut self.stream).start_send(item)
158    }
159
160    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
161        Pin::new(&mut self.stream).poll_ready(cx)
162    }
163
164    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
165        Pin::new(&mut self.stream).poll_flush(cx)
166    }
167
168    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
169        Pin::new(&mut self.stream).poll_close(cx)
170    }
171}