1use futures::{sink::SinkExt, task::Poll, Future, Sink, Stream};
2use minidom::Element;
3use std::mem::replace;
4use std::pin::Pin;
5use std::task::Context;
6use tokio::task::JoinHandle;
7use xmpp_parsers::{jid::Jid, ns};
8
9use super::connect::client_login;
10use crate::connect::{AsyncReadAndWrite, ServerConnector};
11use crate::event::Event;
12use crate::stream_features::StreamFeatures;
13use crate::xmpp_codec::Packet;
14use crate::xmpp_stream::{add_stanza_id, XMPPStream};
15use crate::{Error, ProtocolError};
16
17/// XMPP client connection and state
18///
19/// It is able to reconnect. TODO: implement session management.
20///
21/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
22/// [`Sink`](#impl-Sink<Packet>) traits.
23pub struct Client<C: ServerConnector> {
24 config: Config<C>,
25 state: ClientState<C::Stream>,
26 reconnect: bool,
27 // TODO: tls_required=true
28}
29
30/// XMPP client configuration
31#[derive(Clone, Debug)]
32pub struct Config<C> {
33 /// jid of the account
34 pub jid: Jid,
35 /// password of the account
36 pub password: String,
37 /// server configuration for the account
38 pub server: C,
39}
40
41enum ClientState<S: AsyncReadAndWrite> {
42 Invalid,
43 Disconnected,
44 Connecting(JoinHandle<Result<XMPPStream<S>, Error>>),
45 Connected(XMPPStream<S>),
46}
47
48impl<C: ServerConnector> Client<C> {
49 /// Start a new client given that the JID is already parsed.
50 pub fn new_with_config(config: Config<C>) -> Self {
51 let connect = tokio::spawn(client_login(
52 config.server.clone(),
53 config.jid.clone(),
54 config.password.clone(),
55 ));
56 let client = Client {
57 config,
58 state: ClientState::Connecting(connect),
59 reconnect: false,
60 };
61 client
62 }
63
64 /// Set whether to reconnect (`true`) or let the stream end
65 /// (`false`) when a connection to the server has ended.
66 pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self {
67 self.reconnect = reconnect;
68 self
69 }
70
71 /// Get the client's bound JID (the one reported by the XMPP
72 /// server).
73 pub fn bound_jid(&self) -> Option<&Jid> {
74 match self.state {
75 ClientState::Connected(ref stream) => Some(&stream.jid),
76 _ => None,
77 }
78 }
79
80 /// Send stanza
81 pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
82 self.send(Packet::Stanza(add_stanza_id(stanza, ns::JABBER_CLIENT)))
83 .await
84 }
85
86 /// Get the stream features (`<stream:features/>`) of the underlying stream
87 pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
88 match self.state {
89 ClientState::Connected(ref stream) => Some(&stream.stream_features),
90 _ => None,
91 }
92 }
93
94 /// End connection by sending `</stream:stream>`
95 ///
96 /// You may expect the server to respond with the same. This
97 /// client will then drop its connection.
98 ///
99 /// Make sure to disable reconnect.
100 pub async fn send_end(&mut self) -> Result<(), Error> {
101 self.send(Packet::StreamEnd).await
102 }
103}
104
105/// Incoming XMPP events
106///
107/// In an `async fn` you may want to use this with `use
108/// futures::stream::StreamExt;`
109impl<C: ServerConnector> Stream for Client<C> {
110 type Item = Event;
111
112 /// Low-level read on the XMPP stream, allowing the underlying
113 /// machinery to:
114 ///
115 /// * connect,
116 /// * starttls,
117 /// * authenticate,
118 /// * bind a session, and finally
119 /// * receive stanzas
120 ///
121 /// ...for your client
122 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
123 let state = replace(&mut self.state, ClientState::Invalid);
124
125 match state {
126 ClientState::Invalid => panic!("Invalid client state"),
127 ClientState::Disconnected if self.reconnect => {
128 // TODO: add timeout
129 let connect = tokio::spawn(client_login(
130 self.config.server.clone(),
131 self.config.jid.clone(),
132 self.config.password.clone(),
133 ));
134 self.state = ClientState::Connecting(connect);
135 self.poll_next(cx)
136 }
137 ClientState::Disconnected => {
138 self.state = ClientState::Disconnected;
139 Poll::Pending
140 }
141 ClientState::Connecting(mut connect) => match Pin::new(&mut connect).poll(cx) {
142 Poll::Ready(Ok(Ok(stream))) => {
143 let bound_jid = stream.jid.clone();
144 self.state = ClientState::Connected(stream);
145 Poll::Ready(Some(Event::Online {
146 bound_jid,
147 resumed: false,
148 }))
149 }
150 Poll::Ready(Ok(Err(e))) => {
151 self.state = ClientState::Disconnected;
152 return Poll::Ready(Some(Event::Disconnected(e.into())));
153 }
154 Poll::Ready(Err(e)) => {
155 self.state = ClientState::Disconnected;
156 panic!("connect task: {}", e);
157 }
158 Poll::Pending => {
159 self.state = ClientState::Connecting(connect);
160 Poll::Pending
161 }
162 },
163 ClientState::Connected(mut stream) => {
164 // Poll sink
165 match Pin::new(&mut stream).poll_ready(cx) {
166 Poll::Pending => (),
167 Poll::Ready(Ok(())) => (),
168 Poll::Ready(Err(e)) => {
169 self.state = ClientState::Disconnected;
170 return Poll::Ready(Some(Event::Disconnected(e.into())));
171 }
172 };
173
174 // Poll stream
175 //
176 // This needs to be a loop in order to ignore packets we don’t care about, or those
177 // we want to handle elsewhere. Returning something isn’t correct in those two
178 // cases because it would signal to tokio that the XMPPStream is also done, while
179 // there could be additional packets waiting for us.
180 //
181 // The proper solution is thus a loop which we exit once we have something to
182 // return.
183 loop {
184 match Pin::new(&mut stream).poll_next(cx) {
185 Poll::Ready(None) => {
186 // EOF
187 self.state = ClientState::Disconnected;
188 return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
189 }
190 Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
191 // Receive stanza
192 self.state = ClientState::Connected(stream);
193 return Poll::Ready(Some(Event::Stanza(stanza)));
194 }
195 Poll::Ready(Some(Ok(Packet::Text(_)))) => {
196 // Ignore text between stanzas
197 }
198 Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => {
199 // <stream:stream>
200 self.state = ClientState::Disconnected;
201 return Poll::Ready(Some(Event::Disconnected(
202 ProtocolError::InvalidStreamStart.into(),
203 )));
204 }
205 Poll::Ready(Some(Ok(Packet::StreamEnd))) => {
206 // End of stream: </stream:stream>
207 self.state = ClientState::Disconnected;
208 return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
209 }
210 Poll::Pending => {
211 // Try again later
212 self.state = ClientState::Connected(stream);
213 return Poll::Pending;
214 }
215 Poll::Ready(Some(Err(e))) => {
216 self.state = ClientState::Disconnected;
217 return Poll::Ready(Some(Event::Disconnected(e.into())));
218 }
219 }
220 }
221 }
222 }
223 }
224}
225
226/// Outgoing XMPP packets
227///
228/// See `send_stanza()` for an `async fn`
229impl<C: ServerConnector> Sink<Packet> for Client<C> {
230 type Error = Error;
231
232 fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
233 match self.state {
234 ClientState::Connected(ref mut stream) => {
235 Pin::new(stream).start_send(item).map_err(|e| e.into())
236 }
237 _ => Err(Error::InvalidState),
238 }
239 }
240
241 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
242 match self.state {
243 ClientState::Connected(ref mut stream) => {
244 Pin::new(stream).poll_ready(cx).map_err(|e| e.into())
245 }
246 _ => Poll::Pending,
247 }
248 }
249
250 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
251 match self.state {
252 ClientState::Connected(ref mut stream) => {
253 Pin::new(stream).poll_flush(cx).map_err(|e| e.into())
254 }
255 _ => Poll::Pending,
256 }
257 }
258
259 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
260 match self.state {
261 ClientState::Connected(ref mut stream) => {
262 Pin::new(stream).poll_close(cx).map_err(|e| e.into())
263 }
264 _ => Poll::Pending,
265 }
266 }
267}