async_client.rs

  1use futures::{sink::SinkExt, task::Poll, Future, Sink, Stream};
  2use minidom::Element;
  3use std::mem::replace;
  4use std::pin::Pin;
  5use std::task::Context;
  6use tokio::task::JoinHandle;
  7use xmpp_parsers::{jid::Jid, ns};
  8
  9use super::connect::client_login;
 10use crate::connect::{AsyncReadAndWrite, ServerConnector};
 11use crate::event::Event;
 12use crate::stream_features::StreamFeatures;
 13use crate::xmpp_codec::Packet;
 14use crate::xmpp_stream::{add_stanza_id, XMPPStream};
 15use crate::{Error, ProtocolError};
 16
 17/// XMPP client connection and state
 18///
 19/// It is able to reconnect. TODO: implement session management.
 20///
 21/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
 22/// [`Sink`](#impl-Sink<Packet>) traits.
 23pub struct Client<C: ServerConnector> {
 24    config: Config<C>,
 25    state: ClientState<C::Stream>,
 26    reconnect: bool,
 27    // TODO: tls_required=true
 28}
 29
 30/// XMPP client configuration
 31#[derive(Clone, Debug)]
 32pub struct Config<C> {
 33    /// jid of the account
 34    pub jid: Jid,
 35    /// password of the account
 36    pub password: String,
 37    /// server configuration for the account
 38    pub server: C,
 39}
 40
 41enum ClientState<S: AsyncReadAndWrite> {
 42    Invalid,
 43    Disconnected,
 44    Connecting(JoinHandle<Result<XMPPStream<S>, Error>>),
 45    Connected(XMPPStream<S>),
 46}
 47
 48impl<C: ServerConnector> Client<C> {
 49    /// Start a new client given that the JID is already parsed.
 50    pub fn new_with_config(config: Config<C>) -> Self {
 51        let connect = tokio::spawn(client_login(
 52            config.server.clone(),
 53            config.jid.clone(),
 54            config.password.clone(),
 55        ));
 56        let client = Client {
 57            config,
 58            state: ClientState::Connecting(connect),
 59            reconnect: false,
 60        };
 61        client
 62    }
 63
 64    /// Set whether to reconnect (`true`) or let the stream end
 65    /// (`false`) when a connection to the server has ended.
 66    pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self {
 67        self.reconnect = reconnect;
 68        self
 69    }
 70
 71    /// Get the client's bound JID (the one reported by the XMPP
 72    /// server).
 73    pub fn bound_jid(&self) -> Option<&Jid> {
 74        match self.state {
 75            ClientState::Connected(ref stream) => Some(&stream.jid),
 76            _ => None,
 77        }
 78    }
 79
 80    /// Send stanza
 81    pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
 82        self.send(Packet::Stanza(add_stanza_id(stanza, ns::JABBER_CLIENT)))
 83            .await
 84    }
 85
 86    /// Get the stream features (`<stream:features/>`) of the underlying stream
 87    pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
 88        match self.state {
 89            ClientState::Connected(ref stream) => Some(&stream.stream_features),
 90            _ => None,
 91        }
 92    }
 93
 94    /// End connection by sending `</stream:stream>`
 95    ///
 96    /// You may expect the server to respond with the same. This
 97    /// client will then drop its connection.
 98    ///
 99    /// Make sure to disable reconnect.
100    pub async fn send_end(&mut self) -> Result<(), Error> {
101        self.send(Packet::StreamEnd).await
102    }
103}
104
105/// Incoming XMPP events
106///
107/// In an `async fn` you may want to use this with `use
108/// futures::stream::StreamExt;`
109impl<C: ServerConnector> Stream for Client<C> {
110    type Item = Event;
111
112    /// Low-level read on the XMPP stream, allowing the underlying
113    /// machinery to:
114    ///
115    /// * connect,
116    /// * starttls,
117    /// * authenticate,
118    /// * bind a session, and finally
119    /// * receive stanzas
120    ///
121    /// ...for your client
122    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
123        let state = replace(&mut self.state, ClientState::Invalid);
124
125        match state {
126            ClientState::Invalid => panic!("Invalid client state"),
127            ClientState::Disconnected if self.reconnect => {
128                // TODO: add timeout
129                let connect = tokio::spawn(client_login(
130                    self.config.server.clone(),
131                    self.config.jid.clone(),
132                    self.config.password.clone(),
133                ));
134                self.state = ClientState::Connecting(connect);
135                self.poll_next(cx)
136            }
137            ClientState::Disconnected => {
138                self.state = ClientState::Disconnected;
139                Poll::Pending
140            }
141            ClientState::Connecting(mut connect) => match Pin::new(&mut connect).poll(cx) {
142                Poll::Ready(Ok(Ok(stream))) => {
143                    let bound_jid = stream.jid.clone();
144                    self.state = ClientState::Connected(stream);
145                    Poll::Ready(Some(Event::Online {
146                        bound_jid,
147                        resumed: false,
148                    }))
149                }
150                Poll::Ready(Ok(Err(e))) => {
151                    self.state = ClientState::Disconnected;
152                    return Poll::Ready(Some(Event::Disconnected(e.into())));
153                }
154                Poll::Ready(Err(e)) => {
155                    self.state = ClientState::Disconnected;
156                    panic!("connect task: {}", e);
157                }
158                Poll::Pending => {
159                    self.state = ClientState::Connecting(connect);
160                    Poll::Pending
161                }
162            },
163            ClientState::Connected(mut stream) => {
164                // Poll sink
165                match Pin::new(&mut stream).poll_ready(cx) {
166                    Poll::Pending => (),
167                    Poll::Ready(Ok(())) => (),
168                    Poll::Ready(Err(e)) => {
169                        self.state = ClientState::Disconnected;
170                        return Poll::Ready(Some(Event::Disconnected(e.into())));
171                    }
172                };
173
174                // Poll stream
175                //
176                // This needs to be a loop in order to ignore packets we don’t care about, or those
177                // we want to handle elsewhere.  Returning something isn’t correct in those two
178                // cases because it would signal to tokio that the XMPPStream is also done, while
179                // there could be additional packets waiting for us.
180                //
181                // The proper solution is thus a loop which we exit once we have something to
182                // return.
183                loop {
184                    match Pin::new(&mut stream).poll_next(cx) {
185                        Poll::Ready(None) => {
186                            // EOF
187                            self.state = ClientState::Disconnected;
188                            return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
189                        }
190                        Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
191                            // Receive stanza
192                            self.state = ClientState::Connected(stream);
193                            return Poll::Ready(Some(Event::Stanza(stanza)));
194                        }
195                        Poll::Ready(Some(Ok(Packet::Text(_)))) => {
196                            // Ignore text between stanzas
197                        }
198                        Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => {
199                            // <stream:stream>
200                            self.state = ClientState::Disconnected;
201                            return Poll::Ready(Some(Event::Disconnected(
202                                ProtocolError::InvalidStreamStart.into(),
203                            )));
204                        }
205                        Poll::Ready(Some(Ok(Packet::StreamEnd))) => {
206                            // End of stream: </stream:stream>
207                            self.state = ClientState::Disconnected;
208                            return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
209                        }
210                        Poll::Pending => {
211                            // Try again later
212                            self.state = ClientState::Connected(stream);
213                            return Poll::Pending;
214                        }
215                        Poll::Ready(Some(Err(e))) => {
216                            self.state = ClientState::Disconnected;
217                            return Poll::Ready(Some(Event::Disconnected(e.into())));
218                        }
219                    }
220                }
221            }
222        }
223    }
224}
225
226/// Outgoing XMPP packets
227///
228/// See `send_stanza()` for an `async fn`
229impl<C: ServerConnector> Sink<Packet> for Client<C> {
230    type Error = Error;
231
232    fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
233        match self.state {
234            ClientState::Connected(ref mut stream) => {
235                Pin::new(stream).start_send(item).map_err(|e| e.into())
236            }
237            _ => Err(Error::InvalidState),
238        }
239    }
240
241    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
242        match self.state {
243            ClientState::Connected(ref mut stream) => {
244                Pin::new(stream).poll_ready(cx).map_err(|e| e.into())
245            }
246            _ => Poll::Pending,
247        }
248    }
249
250    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
251        match self.state {
252            ClientState::Connected(ref mut stream) => {
253                Pin::new(stream).poll_flush(cx).map_err(|e| e.into())
254            }
255            _ => Poll::Pending,
256        }
257    }
258
259    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
260        match self.state {
261            ClientState::Connected(ref mut stream) => {
262                Pin::new(stream).poll_close(cx).map_err(|e| e.into())
263            }
264            _ => Poll::Pending,
265        }
266    }
267}