1use futures::{sink::SinkExt, Sink, Stream};
2use idna;
3use sasl::common::{ChannelBinding, Credentials};
4use std::pin::Pin;
5use std::str::FromStr;
6use std::task::{Context, Poll};
7use tokio::net::TcpStream;
8#[cfg(feature = "tls-native")]
9use tokio_native_tls::TlsStream;
10#[cfg(feature = "tls-rust")]
11use tokio_rustls::client::TlsStream;
12use tokio_stream::StreamExt;
13use xmpp_parsers::{ns, Element, Jid};
14
15use super::auth::auth;
16use super::bind::bind;
17use crate::happy_eyeballs::connect_with_srv;
18use crate::starttls::starttls;
19use crate::xmpp_codec::Packet;
20use crate::xmpp_stream::{self, add_stanza_id};
21use crate::{Error, ProtocolError};
22
23/// A simple XMPP client connection
24///
25/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
26/// [`Sink`](#impl-Sink<Packet>) traits.
27pub struct Client {
28 stream: XMPPStream,
29}
30
31type XMPPStream = xmpp_stream::XMPPStream<TlsStream<TcpStream>>;
32
33impl Client {
34 /// Start a new XMPP client and wait for a usable session
35 pub async fn new<P: Into<String>>(jid: &str, password: P) -> Result<Self, Error> {
36 let jid = Jid::from_str(jid)?;
37 let client = Self::new_with_jid(jid, password.into()).await?;
38 Ok(client)
39 }
40
41 /// Start a new client given that the JID is already parsed.
42 pub async fn new_with_jid(jid: Jid, password: String) -> Result<Self, Error> {
43 let stream = Self::connect(jid.clone(), password.clone()).await?;
44 Ok(Client { stream })
45 }
46
47 /// Get direct access to inner XMPP Stream
48 pub fn into_inner(self) -> XMPPStream {
49 self.stream
50 }
51
52 async fn connect(jid: Jid, password: String) -> Result<XMPPStream, Error> {
53 let username = jid.clone().node().unwrap();
54 let password = password;
55 let domain = idna::domain_to_ascii(&jid.clone().domain()).map_err(|_| Error::Idna)?;
56
57 // TCP connection
58 let tcp_stream = connect_with_srv(&domain, "_xmpp-client._tcp", 5222).await?;
59
60 // Unencryped XMPPStream
61 let xmpp_stream =
62 xmpp_stream::XMPPStream::start(tcp_stream, jid.clone(), ns::JABBER_CLIENT.to_owned())
63 .await?;
64
65 let xmpp_stream = if xmpp_stream.stream_features.can_starttls() {
66 // TlsStream
67 let tls_stream = starttls(xmpp_stream).await?;
68 // Encrypted XMPPStream
69 xmpp_stream::XMPPStream::start(tls_stream, jid.clone(), ns::JABBER_CLIENT.to_owned())
70 .await?
71 } else {
72 return Err(Error::Protocol(ProtocolError::NoTls));
73 };
74
75 let creds = Credentials::default()
76 .with_username(username)
77 .with_password(password)
78 .with_channel_binding(ChannelBinding::None);
79 // Authenticated (unspecified) stream
80 let stream = auth(xmpp_stream, creds).await?;
81 // Authenticated XMPPStream
82 let xmpp_stream =
83 xmpp_stream::XMPPStream::start(stream, jid, ns::JABBER_CLIENT.to_owned()).await?;
84
85 // XMPPStream bound to user session
86 let xmpp_stream = bind(xmpp_stream).await?;
87 Ok(xmpp_stream)
88 }
89
90 /// Get the client's bound JID (the one reported by the XMPP
91 /// server).
92 pub fn bound_jid(&self) -> &Jid {
93 &self.stream.jid
94 }
95
96 /// Send stanza
97 pub async fn send_stanza<E>(&mut self, stanza: E) -> Result<(), Error>
98 where
99 E: Into<Element>,
100 {
101 self.send(Packet::Stanza(add_stanza_id(
102 stanza.into(),
103 ns::JABBER_CLIENT,
104 )))
105 .await
106 }
107
108 /// End connection by sending `</stream:stream>`
109 ///
110 /// You may expect the server to respond with the same. This
111 /// client will then drop its connection.
112 pub async fn end(mut self) -> Result<(), Error> {
113 self.send(Packet::StreamEnd).await?;
114
115 // Wait for stream end from server
116 while let Some(Ok(_)) = self.next().await {}
117
118 Ok(())
119 }
120}
121
122/// Incoming XMPP events
123///
124/// In an `async fn` you may want to use this with `use
125/// futures::stream::StreamExt;`
126impl Stream for Client {
127 type Item = Result<Element, Error>;
128
129 /// Low-level read on the XMPP stream
130 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
131 loop {
132 match Pin::new(&mut self.stream).poll_next(cx) {
133 Poll::Pending => return Poll::Pending,
134 Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
135 return Poll::Ready(Some(Ok(stanza)))
136 }
137 Poll::Ready(Some(Ok(Packet::Text(_)))) => {
138 // Ignore, retry
139 }
140 Poll::Ready(_) =>
141 // Unexpected and errors, just end
142 {
143 return Poll::Ready(None)
144 }
145 }
146 }
147 }
148}
149
150/// Outgoing XMPP packets
151///
152/// See `send_stanza()` for an `async fn`
153impl Sink<Packet> for Client {
154 type Error = Error;
155
156 fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
157 Pin::new(&mut self.stream).start_send(item)
158 }
159
160 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
161 Pin::new(&mut self.stream).poll_ready(cx)
162 }
163
164 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
165 Pin::new(&mut self.stream).poll_flush(cx)
166 }
167
168 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
169 Pin::new(&mut self.stream).poll_close(cx)
170 }
171}