1use std::mem::replace;
2use futures::{Future, Poll, Async, sink, Stream};
3use tokio_io::{AsyncRead, AsyncWrite};
4use xmpp_parsers::iq::{Iq, IqType};
5use xmpp_parsers::bind::Bind;
6use try_from::TryFrom;
7
8use crate::xmpp_codec::Packet;
9use crate::xmpp_stream::XMPPStream;
10use crate::{Error, ProtocolError};
11
12const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind";
13const BIND_REQ_ID: &str = "resource-bind";
14
15pub enum ClientBind<S: AsyncWrite> {
16 Unsupported(XMPPStream<S>),
17 WaitSend(sink::Send<XMPPStream<S>>),
18 WaitRecv(XMPPStream<S>),
19 Invalid,
20}
21
22impl<S: AsyncWrite> ClientBind<S> {
23 /// Consumes and returns the stream to express that you cannot use
24 /// the stream for anything else until the resource binding
25 /// req/resp are done.
26 pub fn new(stream: XMPPStream<S>) -> Self {
27 match stream.stream_features.get_child("bind", NS_XMPP_BIND) {
28 None =>
29 // No resource binding available,
30 // return the (probably // usable) stream immediately
31 ClientBind::Unsupported(stream),
32 Some(_) => {
33 let resource = stream.jid.resource.clone();
34 let iq = Iq::from_set(Bind::new(resource))
35 .with_id(BIND_REQ_ID.to_string());
36 let send = stream.send_stanza(iq);
37 ClientBind::WaitSend(send)
38 },
39 }
40 }
41}
42
43impl<S: AsyncRead + AsyncWrite> Future for ClientBind<S> {
44 type Item = XMPPStream<S>;
45 type Error = Error;
46
47 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
48 let state = replace(self, ClientBind::Invalid);
49
50 match state {
51 ClientBind::Unsupported(stream) =>
52 Ok(Async::Ready(stream)),
53 ClientBind::WaitSend(mut send) => {
54 match send.poll() {
55 Ok(Async::Ready(stream)) => {
56 replace(self, ClientBind::WaitRecv(stream));
57 self.poll()
58 },
59 Ok(Async::NotReady) => {
60 replace(self, ClientBind::WaitSend(send));
61 Ok(Async::NotReady)
62 },
63 Err(e) =>
64 Err(e)?
65 }
66 },
67 ClientBind::WaitRecv(mut stream) => {
68 match stream.poll() {
69 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) =>
70 match Iq::try_from(stanza) {
71 Ok(iq) => if iq.id == Some(BIND_REQ_ID.to_string()) {
72 match iq.payload {
73 IqType::Result(payload) => {
74 payload
75 .and_then(|payload| Bind::try_from(payload).ok())
76 .map(|bind| match bind {
77 Bind::Jid(jid) => stream.jid = jid,
78 _ => {}
79 });
80 Ok(Async::Ready(stream))
81 },
82 _ =>
83 Err(ProtocolError::InvalidBindResponse)?,
84 }
85 } else {
86 Ok(Async::NotReady)
87 },
88 _ => Ok(Async::NotReady),
89 },
90 Ok(Async::Ready(_)) => {
91 replace(self, ClientBind::WaitRecv(stream));
92 self.poll()
93 },
94 Ok(Async::NotReady) => {
95 replace(self, ClientBind::WaitRecv(stream));
96 Ok(Async::NotReady)
97 },
98 Err(e) =>
99 Err(e)?,
100 }
101 },
102 ClientBind::Invalid =>
103 unreachable!(),
104 }
105 }
106}