async_client.rs

  1use futures::{sink::SinkExt, task::Poll, Future, Sink, Stream};
  2use std::mem::replace;
  3use std::pin::Pin;
  4use std::task::Context;
  5use tokio::task::JoinHandle;
  6use xmpp_parsers::{ns, Element, Jid};
  7
  8use super::connect::client_login;
  9use crate::connect::{AsyncReadAndWrite, ServerConnector};
 10use crate::event::Event;
 11use crate::stream_features::StreamFeatures;
 12use crate::xmpp_codec::Packet;
 13use crate::xmpp_stream::{add_stanza_id, XMPPStream};
 14use crate::{Error, ProtocolError};
 15
 16/// XMPP client connection and state
 17///
 18/// It is able to reconnect. TODO: implement session management.
 19///
 20/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
 21/// [`Sink`](#impl-Sink<Packet>) traits.
 22pub struct Client<C: ServerConnector> {
 23    config: Config<C>,
 24    state: ClientState<C::Stream>,
 25    reconnect: bool,
 26    // TODO: tls_required=true
 27}
 28
 29/// XMPP client configuration
 30#[derive(Clone, Debug)]
 31pub struct Config<C> {
 32    /// jid of the account
 33    pub jid: Jid,
 34    /// password of the account
 35    pub password: String,
 36    /// server configuration for the account
 37    pub server: C,
 38}
 39
 40enum ClientState<S: AsyncReadAndWrite> {
 41    Invalid,
 42    Disconnected,
 43    Connecting(JoinHandle<Result<XMPPStream<S>, Error>>),
 44    Connected(XMPPStream<S>),
 45}
 46
 47impl<C: ServerConnector> Client<C> {
 48    /// Start a new client given that the JID is already parsed.
 49    pub fn new_with_config(config: Config<C>) -> Self {
 50        let connect = tokio::spawn(client_login(
 51            config.server.clone(),
 52            config.jid.clone(),
 53            config.password.clone(),
 54        ));
 55        let client = Client {
 56            config,
 57            state: ClientState::Connecting(connect),
 58            reconnect: false,
 59        };
 60        client
 61    }
 62
 63    /// Set whether to reconnect (`true`) or let the stream end
 64    /// (`false`) when a connection to the server has ended.
 65    pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self {
 66        self.reconnect = reconnect;
 67        self
 68    }
 69
 70    /// Get the client's bound JID (the one reported by the XMPP
 71    /// server).
 72    pub fn bound_jid(&self) -> Option<&Jid> {
 73        match self.state {
 74            ClientState::Connected(ref stream) => Some(&stream.jid),
 75            _ => None,
 76        }
 77    }
 78
 79    /// Send stanza
 80    pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
 81        self.send(Packet::Stanza(add_stanza_id(stanza, ns::JABBER_CLIENT)))
 82            .await
 83    }
 84
 85    /// Get the stream features (`<stream:features/>`) of the underlying stream
 86    pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
 87        match self.state {
 88            ClientState::Connected(ref stream) => Some(&stream.stream_features),
 89            _ => None,
 90        }
 91    }
 92
 93    /// End connection by sending `</stream:stream>`
 94    ///
 95    /// You may expect the server to respond with the same. This
 96    /// client will then drop its connection.
 97    ///
 98    /// Make sure to disable reconnect.
 99    pub async fn send_end(&mut self) -> Result<(), Error> {
100        self.send(Packet::StreamEnd).await
101    }
102}
103
104/// Incoming XMPP events
105///
106/// In an `async fn` you may want to use this with `use
107/// futures::stream::StreamExt;`
108impl<C: ServerConnector> Stream for Client<C> {
109    type Item = Event;
110
111    /// Low-level read on the XMPP stream, allowing the underlying
112    /// machinery to:
113    ///
114    /// * connect,
115    /// * starttls,
116    /// * authenticate,
117    /// * bind a session, and finally
118    /// * receive stanzas
119    ///
120    /// ...for your client
121    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
122        let state = replace(&mut self.state, ClientState::Invalid);
123
124        match state {
125            ClientState::Invalid => panic!("Invalid client state"),
126            ClientState::Disconnected if self.reconnect => {
127                // TODO: add timeout
128                let connect = tokio::spawn(client_login(
129                    self.config.server.clone(),
130                    self.config.jid.clone(),
131                    self.config.password.clone(),
132                ));
133                self.state = ClientState::Connecting(connect);
134                self.poll_next(cx)
135            }
136            ClientState::Disconnected => {
137                self.state = ClientState::Disconnected;
138                Poll::Pending
139            }
140            ClientState::Connecting(mut connect) => match Pin::new(&mut connect).poll(cx) {
141                Poll::Ready(Ok(Ok(stream))) => {
142                    let bound_jid = stream.jid.clone();
143                    self.state = ClientState::Connected(stream);
144                    Poll::Ready(Some(Event::Online {
145                        bound_jid,
146                        resumed: false,
147                    }))
148                }
149                Poll::Ready(Ok(Err(e))) => {
150                    self.state = ClientState::Disconnected;
151                    return Poll::Ready(Some(Event::Disconnected(e.into())));
152                }
153                Poll::Ready(Err(e)) => {
154                    self.state = ClientState::Disconnected;
155                    panic!("connect task: {}", e);
156                }
157                Poll::Pending => {
158                    self.state = ClientState::Connecting(connect);
159                    Poll::Pending
160                }
161            },
162            ClientState::Connected(mut stream) => {
163                // Poll sink
164                match Pin::new(&mut stream).poll_ready(cx) {
165                    Poll::Pending => (),
166                    Poll::Ready(Ok(())) => (),
167                    Poll::Ready(Err(e)) => {
168                        self.state = ClientState::Disconnected;
169                        return Poll::Ready(Some(Event::Disconnected(e.into())));
170                    }
171                };
172
173                // Poll stream
174                //
175                // This needs to be a loop in order to ignore packets we don’t care about, or those
176                // we want to handle elsewhere.  Returning something isn’t correct in those two
177                // cases because it would signal to tokio that the XMPPStream is also done, while
178                // there could be additional packets waiting for us.
179                //
180                // The proper solution is thus a loop which we exit once we have something to
181                // return.
182                loop {
183                    match Pin::new(&mut stream).poll_next(cx) {
184                        Poll::Ready(None) => {
185                            // EOF
186                            self.state = ClientState::Disconnected;
187                            return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
188                        }
189                        Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
190                            // Receive stanza
191                            self.state = ClientState::Connected(stream);
192                            return Poll::Ready(Some(Event::Stanza(stanza)));
193                        }
194                        Poll::Ready(Some(Ok(Packet::Text(_)))) => {
195                            // Ignore text between stanzas
196                        }
197                        Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => {
198                            // <stream:stream>
199                            self.state = ClientState::Disconnected;
200                            return Poll::Ready(Some(Event::Disconnected(
201                                ProtocolError::InvalidStreamStart.into(),
202                            )));
203                        }
204                        Poll::Ready(Some(Ok(Packet::StreamEnd))) => {
205                            // End of stream: </stream:stream>
206                            self.state = ClientState::Disconnected;
207                            return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
208                        }
209                        Poll::Pending => {
210                            // Try again later
211                            self.state = ClientState::Connected(stream);
212                            return Poll::Pending;
213                        }
214                        Poll::Ready(Some(Err(e))) => {
215                            self.state = ClientState::Disconnected;
216                            return Poll::Ready(Some(Event::Disconnected(e.into())));
217                        }
218                    }
219                }
220            }
221        }
222    }
223}
224
225/// Outgoing XMPP packets
226///
227/// See `send_stanza()` for an `async fn`
228impl<C: ServerConnector> Sink<Packet> for Client<C> {
229    type Error = Error;
230
231    fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
232        match self.state {
233            ClientState::Connected(ref mut stream) => {
234                Pin::new(stream).start_send(item).map_err(|e| e.into())
235            }
236            _ => Err(Error::InvalidState),
237        }
238    }
239
240    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
241        match self.state {
242            ClientState::Connected(ref mut stream) => {
243                Pin::new(stream).poll_ready(cx).map_err(|e| e.into())
244            }
245            _ => Poll::Pending,
246        }
247    }
248
249    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
250        match self.state {
251            ClientState::Connected(ref mut stream) => {
252                Pin::new(stream).poll_flush(cx).map_err(|e| e.into())
253            }
254            _ => Poll::Pending,
255        }
256    }
257
258    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
259        match self.state {
260            ClientState::Connected(ref mut stream) => {
261                Pin::new(stream).poll_close(cx).map_err(|e| e.into())
262            }
263            _ => Poll::Pending,
264        }
265    }
266}