1use futures::{sink::SinkExt, task::Poll, Future, Sink, Stream};
2use std::mem::replace;
3use std::pin::Pin;
4use std::task::Context;
5use tokio::task::JoinHandle;
6use xmpp_parsers::{ns, Element, Jid};
7
8use super::connect::client_login;
9use crate::connect::{AsyncReadAndWrite, ServerConnector};
10use crate::event::Event;
11use crate::stream_features::StreamFeatures;
12use crate::xmpp_codec::Packet;
13use crate::xmpp_stream::{add_stanza_id, XMPPStream};
14use crate::{Error, ProtocolError};
15
16/// XMPP client connection and state
17///
18/// It is able to reconnect. TODO: implement session management.
19///
20/// This implements the `futures` crate's [`Stream`](#impl-Stream) and
21/// [`Sink`](#impl-Sink<Packet>) traits.
22pub struct Client<C: ServerConnector> {
23 config: Config<C>,
24 state: ClientState<C::Stream>,
25 reconnect: bool,
26 // TODO: tls_required=true
27}
28
29/// XMPP client configuration
30#[derive(Clone, Debug)]
31pub struct Config<C> {
32 /// jid of the account
33 pub jid: Jid,
34 /// password of the account
35 pub password: String,
36 /// server configuration for the account
37 pub server: C,
38}
39
40enum ClientState<S: AsyncReadAndWrite> {
41 Invalid,
42 Disconnected,
43 Connecting(JoinHandle<Result<XMPPStream<S>, Error>>),
44 Connected(XMPPStream<S>),
45}
46
47impl<C: ServerConnector> Client<C> {
48 /// Start a new client given that the JID is already parsed.
49 pub fn new_with_config(config: Config<C>) -> Self {
50 let connect = tokio::spawn(client_login(
51 config.server.clone(),
52 config.jid.clone(),
53 config.password.clone(),
54 ));
55 let client = Client {
56 config,
57 state: ClientState::Connecting(connect),
58 reconnect: false,
59 };
60 client
61 }
62
63 /// Set whether to reconnect (`true`) or let the stream end
64 /// (`false`) when a connection to the server has ended.
65 pub fn set_reconnect(&mut self, reconnect: bool) -> &mut Self {
66 self.reconnect = reconnect;
67 self
68 }
69
70 /// Get the client's bound JID (the one reported by the XMPP
71 /// server).
72 pub fn bound_jid(&self) -> Option<&Jid> {
73 match self.state {
74 ClientState::Connected(ref stream) => Some(&stream.jid),
75 _ => None,
76 }
77 }
78
79 /// Send stanza
80 pub async fn send_stanza(&mut self, stanza: Element) -> Result<(), Error> {
81 self.send(Packet::Stanza(add_stanza_id(stanza, ns::JABBER_CLIENT)))
82 .await
83 }
84
85 /// Get the stream features (`<stream:features/>`) of the underlying stream
86 pub fn get_stream_features(&self) -> Option<&StreamFeatures> {
87 match self.state {
88 ClientState::Connected(ref stream) => Some(&stream.stream_features),
89 _ => None,
90 }
91 }
92
93 /// End connection by sending `</stream:stream>`
94 ///
95 /// You may expect the server to respond with the same. This
96 /// client will then drop its connection.
97 ///
98 /// Make sure to disable reconnect.
99 pub async fn send_end(&mut self) -> Result<(), Error> {
100 self.send(Packet::StreamEnd).await
101 }
102}
103
104/// Incoming XMPP events
105///
106/// In an `async fn` you may want to use this with `use
107/// futures::stream::StreamExt;`
108impl<C: ServerConnector> Stream for Client<C> {
109 type Item = Event;
110
111 /// Low-level read on the XMPP stream, allowing the underlying
112 /// machinery to:
113 ///
114 /// * connect,
115 /// * starttls,
116 /// * authenticate,
117 /// * bind a session, and finally
118 /// * receive stanzas
119 ///
120 /// ...for your client
121 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
122 let state = replace(&mut self.state, ClientState::Invalid);
123
124 match state {
125 ClientState::Invalid => panic!("Invalid client state"),
126 ClientState::Disconnected if self.reconnect => {
127 // TODO: add timeout
128 let connect = tokio::spawn(client_login(
129 self.config.server.clone(),
130 self.config.jid.clone(),
131 self.config.password.clone(),
132 ));
133 self.state = ClientState::Connecting(connect);
134 self.poll_next(cx)
135 }
136 ClientState::Disconnected => {
137 self.state = ClientState::Disconnected;
138 Poll::Pending
139 }
140 ClientState::Connecting(mut connect) => match Pin::new(&mut connect).poll(cx) {
141 Poll::Ready(Ok(Ok(stream))) => {
142 let bound_jid = stream.jid.clone();
143 self.state = ClientState::Connected(stream);
144 Poll::Ready(Some(Event::Online {
145 bound_jid,
146 resumed: false,
147 }))
148 }
149 Poll::Ready(Ok(Err(e))) => {
150 self.state = ClientState::Disconnected;
151 return Poll::Ready(Some(Event::Disconnected(e.into())));
152 }
153 Poll::Ready(Err(e)) => {
154 self.state = ClientState::Disconnected;
155 panic!("connect task: {}", e);
156 }
157 Poll::Pending => {
158 self.state = ClientState::Connecting(connect);
159 Poll::Pending
160 }
161 },
162 ClientState::Connected(mut stream) => {
163 // Poll sink
164 match Pin::new(&mut stream).poll_ready(cx) {
165 Poll::Pending => (),
166 Poll::Ready(Ok(())) => (),
167 Poll::Ready(Err(e)) => {
168 self.state = ClientState::Disconnected;
169 return Poll::Ready(Some(Event::Disconnected(e.into())));
170 }
171 };
172
173 // Poll stream
174 //
175 // This needs to be a loop in order to ignore packets we don’t care about, or those
176 // we want to handle elsewhere. Returning something isn’t correct in those two
177 // cases because it would signal to tokio that the XMPPStream is also done, while
178 // there could be additional packets waiting for us.
179 //
180 // The proper solution is thus a loop which we exit once we have something to
181 // return.
182 loop {
183 match Pin::new(&mut stream).poll_next(cx) {
184 Poll::Ready(None) => {
185 // EOF
186 self.state = ClientState::Disconnected;
187 return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
188 }
189 Poll::Ready(Some(Ok(Packet::Stanza(stanza)))) => {
190 // Receive stanza
191 self.state = ClientState::Connected(stream);
192 return Poll::Ready(Some(Event::Stanza(stanza)));
193 }
194 Poll::Ready(Some(Ok(Packet::Text(_)))) => {
195 // Ignore text between stanzas
196 }
197 Poll::Ready(Some(Ok(Packet::StreamStart(_)))) => {
198 // <stream:stream>
199 self.state = ClientState::Disconnected;
200 return Poll::Ready(Some(Event::Disconnected(
201 ProtocolError::InvalidStreamStart.into(),
202 )));
203 }
204 Poll::Ready(Some(Ok(Packet::StreamEnd))) => {
205 // End of stream: </stream:stream>
206 self.state = ClientState::Disconnected;
207 return Poll::Ready(Some(Event::Disconnected(Error::Disconnected)));
208 }
209 Poll::Pending => {
210 // Try again later
211 self.state = ClientState::Connected(stream);
212 return Poll::Pending;
213 }
214 Poll::Ready(Some(Err(e))) => {
215 self.state = ClientState::Disconnected;
216 return Poll::Ready(Some(Event::Disconnected(e.into())));
217 }
218 }
219 }
220 }
221 }
222 }
223}
224
225/// Outgoing XMPP packets
226///
227/// See `send_stanza()` for an `async fn`
228impl<C: ServerConnector> Sink<Packet> for Client<C> {
229 type Error = Error;
230
231 fn start_send(mut self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
232 match self.state {
233 ClientState::Connected(ref mut stream) => {
234 Pin::new(stream).start_send(item).map_err(|e| e.into())
235 }
236 _ => Err(Error::InvalidState),
237 }
238 }
239
240 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
241 match self.state {
242 ClientState::Connected(ref mut stream) => {
243 Pin::new(stream).poll_ready(cx).map_err(|e| e.into())
244 }
245 _ => Poll::Pending,
246 }
247 }
248
249 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
250 match self.state {
251 ClientState::Connected(ref mut stream) => {
252 Pin::new(stream).poll_flush(cx).map_err(|e| e.into())
253 }
254 _ => Poll::Pending,
255 }
256 }
257
258 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
259 match self.state {
260 ClientState::Connected(ref mut stream) => {
261 Pin::new(stream).poll_close(cx).map_err(|e| e.into())
262 }
263 _ => Poll::Pending,
264 }
265 }
266}