bind.rs

 1use futures::{sink, Async, Future, Poll, Stream};
 2use std::mem::replace;
 3use tokio_io::{AsyncRead, AsyncWrite};
 4use xmpp_parsers::TryFrom;
 5use xmpp_parsers::bind::Bind;
 6use xmpp_parsers::iq::{Iq, IqType};
 7
 8use crate::xmpp_codec::Packet;
 9use crate::xmpp_stream::XMPPStream;
10use crate::{Error, ProtocolError};
11
12const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind";
13const BIND_REQ_ID: &str = "resource-bind";
14
15pub enum ClientBind<S: AsyncWrite> {
16    Unsupported(XMPPStream<S>),
17    WaitSend(sink::Send<XMPPStream<S>>),
18    WaitRecv(XMPPStream<S>),
19    Invalid,
20}
21
22impl<S: AsyncWrite> ClientBind<S> {
23    /// Consumes and returns the stream to express that you cannot use
24    /// the stream for anything else until the resource binding
25    /// req/resp are done.
26    pub fn new(stream: XMPPStream<S>) -> Self {
27        match stream.stream_features.get_child("bind", NS_XMPP_BIND) {
28            None =>
29            // No resource binding available,
30            // return the (probably // usable) stream immediately
31            {
32                ClientBind::Unsupported(stream)
33            }
34            Some(_) => {
35                let resource = stream.jid.resource.clone();
36                let iq = Iq::from_set(BIND_REQ_ID, Bind::new(resource));
37                let send = stream.send_stanza(iq);
38                ClientBind::WaitSend(send)
39            }
40        }
41    }
42}
43
44impl<S: AsyncRead + AsyncWrite> Future for ClientBind<S> {
45    type Item = XMPPStream<S>;
46    type Error = Error;
47
48    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
49        let state = replace(self, ClientBind::Invalid);
50
51        match state {
52            ClientBind::Unsupported(stream) => Ok(Async::Ready(stream)),
53            ClientBind::WaitSend(mut send) => match send.poll() {
54                Ok(Async::Ready(stream)) => {
55                    replace(self, ClientBind::WaitRecv(stream));
56                    self.poll()
57                }
58                Ok(Async::NotReady) => {
59                    replace(self, ClientBind::WaitSend(send));
60                    Ok(Async::NotReady)
61                }
62                Err(e) => Err(e)?,
63            },
64            ClientBind::WaitRecv(mut stream) => match stream.poll() {
65                Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => match Iq::try_from(stanza) {
66                    Ok(iq) => {
67                        if iq.id == BIND_REQ_ID {
68                            match iq.payload {
69                                IqType::Result(payload) => {
70                                    payload
71                                        .and_then(|payload| Bind::try_from(payload).ok())
72                                        .map(|bind| match bind {
73                                            Bind::Jid(jid) => stream.jid = jid,
74                                            _ => {}
75                                        });
76                                    Ok(Async::Ready(stream))
77                                }
78                                _ => Err(ProtocolError::InvalidBindResponse)?,
79                            }
80                        } else {
81                            Ok(Async::NotReady)
82                        }
83                    }
84                    _ => Ok(Async::NotReady),
85                },
86                Ok(Async::Ready(_)) => {
87                    replace(self, ClientBind::WaitRecv(stream));
88                    self.poll()
89                }
90                Ok(Async::NotReady) => {
91                    replace(self, ClientBind::WaitRecv(stream));
92                    Ok(Async::NotReady)
93                }
94                Err(e) => Err(e)?,
95            },
96            ClientBind::Invalid => unreachable!(),
97        }
98    }
99}