bind.rs

  1use futures::{sink, Async, Future, Poll, Stream};
  2use std::convert::TryFrom;
  3use std::mem::replace;
  4use tokio_io::{AsyncRead, AsyncWrite};
  5use xmpp_parsers::Jid;
  6use xmpp_parsers::bind::{BindQuery, BindResponse};
  7use xmpp_parsers::iq::{Iq, IqType};
  8
  9use crate::xmpp_codec::Packet;
 10use crate::xmpp_stream::XMPPStream;
 11use crate::{Error, ProtocolError};
 12
 13const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind";
 14const BIND_REQ_ID: &str = "resource-bind";
 15
 16pub enum ClientBind<S: AsyncWrite> {
 17    Unsupported(XMPPStream<S>),
 18    WaitSend(sink::Send<XMPPStream<S>>),
 19    WaitRecv(XMPPStream<S>),
 20    Invalid,
 21}
 22
 23impl<S: AsyncWrite> ClientBind<S> {
 24    /// Consumes and returns the stream to express that you cannot use
 25    /// the stream for anything else until the resource binding
 26    /// req/resp are done.
 27    pub fn new(stream: XMPPStream<S>) -> Self {
 28        match stream.stream_features.get_child("bind", NS_XMPP_BIND) {
 29            None =>
 30            // No resource binding available,
 31            // return the (probably // usable) stream immediately
 32            {
 33                ClientBind::Unsupported(stream)
 34            }
 35            Some(_) => {
 36                let resource;
 37                if let Jid::Full(jid) = stream.jid.clone() {
 38                    resource = Some(jid.resource);
 39                } else {
 40                    resource = None;
 41                }
 42                let iq = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
 43                let send = stream.send_stanza(iq);
 44                ClientBind::WaitSend(send)
 45            }
 46        }
 47    }
 48}
 49
 50impl<S: AsyncRead + AsyncWrite> Future for ClientBind<S> {
 51    type Item = XMPPStream<S>;
 52    type Error = Error;
 53
 54    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 55        let state = replace(self, ClientBind::Invalid);
 56
 57        match state {
 58            ClientBind::Unsupported(stream) => Ok(Async::Ready(stream)),
 59            ClientBind::WaitSend(mut send) => match send.poll() {
 60                Ok(Async::Ready(stream)) => {
 61                    replace(self, ClientBind::WaitRecv(stream));
 62                    self.poll()
 63                }
 64                Ok(Async::NotReady) => {
 65                    replace(self, ClientBind::WaitSend(send));
 66                    Ok(Async::NotReady)
 67                }
 68                Err(e) => Err(e)?,
 69            },
 70            ClientBind::WaitRecv(mut stream) => match stream.poll() {
 71                Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => match Iq::try_from(stanza) {
 72                    Ok(iq) => {
 73                        if iq.id == BIND_REQ_ID {
 74                            match iq.payload {
 75                                IqType::Result(payload) => {
 76                                    payload
 77                                        .and_then(|payload| BindResponse::try_from(payload).ok())
 78                                        .map(|bind| stream.jid = bind.into());
 79                                    Ok(Async::Ready(stream))
 80                                }
 81                                _ => Err(ProtocolError::InvalidBindResponse)?,
 82                            }
 83                        } else {
 84                            Ok(Async::NotReady)
 85                        }
 86                    }
 87                    _ => Ok(Async::NotReady),
 88                },
 89                Ok(Async::Ready(_)) => {
 90                    replace(self, ClientBind::WaitRecv(stream));
 91                    self.poll()
 92                }
 93                Ok(Async::NotReady) => {
 94                    replace(self, ClientBind::WaitRecv(stream));
 95                    Ok(Async::NotReady)
 96                }
 97                Err(e) => Err(e)?,
 98            },
 99            ClientBind::Invalid => unreachable!(),
100        }
101    }
102}