bind.rs

  1use std::mem::replace;
  2use std::error::Error;
  3use std::str::FromStr;
  4use futures::*;
  5use futures::sink;
  6use tokio_io::{AsyncRead, AsyncWrite};
  7use xml;
  8use jid::Jid;
  9
 10use xmpp_codec::*;
 11use xmpp_stream::*;
 12
 13const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind";
 14const BIND_REQ_ID: &str = "resource-bind";
 15
 16pub enum ClientBind<S: AsyncWrite> {
 17    Unsupported(XMPPStream<S>),
 18    WaitSend(sink::Send<XMPPStream<S>>),
 19    WaitRecv(XMPPStream<S>),
 20    Invalid,
 21}
 22
 23impl<S: AsyncWrite> ClientBind<S> {
 24    /// Consumes and returns the stream to express that you cannot use
 25    /// the stream for anything else until the resource binding
 26    /// req/resp are done.
 27    pub fn new(stream: XMPPStream<S>) -> Self {
 28        match stream.stream_features.get_child("bind", Some(NS_XMPP_BIND)) {
 29            None =>
 30                // No resource binding available,
 31                // return the (probably // usable) stream immediately
 32                ClientBind::Unsupported(stream),
 33            Some(_) => {
 34                let iq = make_bind_request(stream.jid.resource.as_ref());
 35                let send = stream.send(Packet::Stanza(iq));
 36                ClientBind::WaitSend(send)
 37            },
 38        }
 39    }
 40}
 41
 42fn make_bind_request(resource: Option<&String>) -> xml::Element {
 43    let mut iq = xml::Element::new(
 44        "iq".to_owned(),
 45        None,
 46        vec![("type".to_owned(), None, "set".to_owned()),
 47             ("id".to_owned(), None, BIND_REQ_ID.to_owned())]
 48    );
 49    {
 50        let bind_el = iq.tag(
 51            xml::Element::new(
 52                "bind".to_owned(),
 53                Some(NS_XMPP_BIND.to_owned()),
 54                vec![]
 55            ));
 56        resource.map(|resource| {
 57            let resource_el = bind_el.tag(
 58                xml::Element::new(
 59                    "resource".to_owned(),
 60                    Some(NS_XMPP_BIND.to_owned()),
 61                    vec![]
 62                ));
 63            resource_el.text(resource.clone());
 64        });
 65    }
 66    iq
 67}
 68
 69impl<S: AsyncRead + AsyncWrite> Future for ClientBind<S> {
 70    type Item = XMPPStream<S>;
 71    type Error = String;
 72
 73    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 74        let state = replace(self, ClientBind::Invalid);
 75
 76        match state {
 77            ClientBind::Unsupported(stream) =>
 78                Ok(Async::Ready(stream)),
 79            ClientBind::WaitSend(mut send) => {
 80                match send.poll() {
 81                    Ok(Async::Ready(stream)) => {
 82                        replace(self, ClientBind::WaitRecv(stream));
 83                        self.poll()
 84                    },
 85                    Ok(Async::NotReady) => {
 86                        replace(self, ClientBind::WaitSend(send));
 87                        Ok(Async::NotReady)
 88                    },
 89                    Err(e) =>
 90                        Err(e.description().to_owned()),
 91                }
 92            },
 93            ClientBind::WaitRecv(mut stream) => {
 94                match stream.poll() {
 95                    Ok(Async::Ready(Some(Packet::Stanza(ref iq))))
 96                        if iq.name == "iq"
 97                        && iq.get_attribute("id", None) == Some(BIND_REQ_ID) => {
 98                            match iq.get_attribute("type", None) {
 99                                Some("result") => {
100                                    get_bind_response_jid(&iq)
101                                        .map(|jid| stream.jid = jid);
102                                    Ok(Async::Ready(stream))
103                                },
104                                _ =>
105                                    Err("resource bind response".to_owned()),
106                            }
107                        },
108                    Ok(Async::Ready(_)) => {
109                        replace(self, ClientBind::WaitRecv(stream));
110                        self.poll()
111                    },
112                    Ok(Async::NotReady) => {
113                        replace(self, ClientBind::WaitRecv(stream));
114                        Ok(Async::NotReady)
115                    },
116                    Err(e) =>
117                        Err(e.description().to_owned()),
118                }
119            },
120            ClientBind::Invalid =>
121                unreachable!(),
122        }
123    }
124}
125
126fn get_bind_response_jid(iq: &xml::Element) -> Option<Jid> {
127    iq.get_child("bind", Some(NS_XMPP_BIND))
128        .and_then(|bind_el|
129                  bind_el.get_child("jid", Some(NS_XMPP_BIND))
130        )
131        .and_then(|jid_el|
132                  Jid::from_str(&jid_el.content_str())
133                  .ok()
134        )
135}