1use std::mem::replace;
2use std::error::Error;
3use std::str::FromStr;
4use futures::{Future, Poll, Async, sink, Sink, Stream};
5use tokio_io::{AsyncRead, AsyncWrite};
6use jid::Jid;
7use minidom::Element;
8use xmpp_parsers::bind::Bind;
9
10use xmpp_codec::Packet;
11use xmpp_stream::XMPPStream;
12
13const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind";
14const BIND_REQ_ID: &str = "resource-bind";
15
16pub enum ClientBind<S: AsyncWrite> {
17 Unsupported(XMPPStream<S>),
18 WaitSend(sink::Send<XMPPStream<S>>),
19 WaitRecv(XMPPStream<S>),
20 Invalid,
21}
22
23impl<S: AsyncWrite> ClientBind<S> {
24 /// Consumes and returns the stream to express that you cannot use
25 /// the stream for anything else until the resource binding
26 /// req/resp are done.
27 pub fn new(stream: XMPPStream<S>) -> Self {
28 match stream.stream_features.get_child("bind", NS_XMPP_BIND) {
29 None =>
30 // No resource binding available,
31 // return the (probably // usable) stream immediately
32 ClientBind::Unsupported(stream),
33 Some(_) => {
34 let resource = stream.jid.resource.clone();
35 let iq = Element::from(
36 Bind::new(resource)
37 );
38 let send = stream.send(Packet::Stanza(iq));
39 ClientBind::WaitSend(send)
40 },
41 }
42 }
43}
44
45impl<S: AsyncRead + AsyncWrite> Future for ClientBind<S> {
46 type Item = XMPPStream<S>;
47 type Error = String;
48
49 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
50 let state = replace(self, ClientBind::Invalid);
51
52 match state {
53 ClientBind::Unsupported(stream) =>
54 Ok(Async::Ready(stream)),
55 ClientBind::WaitSend(mut send) => {
56 match send.poll() {
57 Ok(Async::Ready(stream)) => {
58 replace(self, ClientBind::WaitRecv(stream));
59 self.poll()
60 },
61 Ok(Async::NotReady) => {
62 replace(self, ClientBind::WaitSend(send));
63 Ok(Async::NotReady)
64 },
65 Err(e) =>
66 Err(e.description().to_owned()),
67 }
68 },
69 ClientBind::WaitRecv(mut stream) => {
70 match stream.poll() {
71 Ok(Async::Ready(Some(Packet::Stanza(ref iq))))
72 if iq.name() == "iq"
73 && iq.attr("id") == Some(BIND_REQ_ID) => {
74 match iq.attr("type") {
75 Some("result") => {
76 get_bind_response_jid(iq)
77 .map(|jid| stream.jid = jid);
78 Ok(Async::Ready(stream))
79 },
80 _ =>
81 Err("resource bind response".to_owned()),
82 }
83 },
84 Ok(Async::Ready(_)) => {
85 replace(self, ClientBind::WaitRecv(stream));
86 self.poll()
87 },
88 Ok(Async::NotReady) => {
89 replace(self, ClientBind::WaitRecv(stream));
90 Ok(Async::NotReady)
91 },
92 Err(e) =>
93 Err(e.description().to_owned()),
94 }
95 },
96 ClientBind::Invalid =>
97 unreachable!(),
98 }
99 }
100}
101
102fn get_bind_response_jid(iq: &Element) -> Option<Jid> {
103 iq.get_child("bind", NS_XMPP_BIND)
104 .and_then(|bind_el|
105 bind_el.get_child("jid", NS_XMPP_BIND)
106 )
107 .and_then(|jid_el|
108 Jid::from_str(&jid_el.text())
109 .ok()
110 )
111}