1use futures::{sink, Async, Future, Poll, Stream};
2use std::mem::replace;
3use tokio_io::{AsyncRead, AsyncWrite};
4use xmpp_parsers::TryFrom;
5use xmpp_parsers::bind::Bind;
6use xmpp_parsers::iq::{Iq, IqType};
7
8use crate::xmpp_codec::Packet;
9use crate::xmpp_stream::XMPPStream;
10use crate::{Error, ProtocolError};
11
12const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind";
13const BIND_REQ_ID: &str = "resource-bind";
14
15pub enum ClientBind<S: AsyncWrite> {
16 Unsupported(XMPPStream<S>),
17 WaitSend(sink::Send<XMPPStream<S>>),
18 WaitRecv(XMPPStream<S>),
19 Invalid,
20}
21
22impl<S: AsyncWrite> ClientBind<S> {
23 /// Consumes and returns the stream to express that you cannot use
24 /// the stream for anything else until the resource binding
25 /// req/resp are done.
26 pub fn new(stream: XMPPStream<S>) -> Self {
27 match stream.stream_features.get_child("bind", NS_XMPP_BIND) {
28 None =>
29 // No resource binding available,
30 // return the (probably // usable) stream immediately
31 {
32 ClientBind::Unsupported(stream)
33 }
34 Some(_) => {
35 let resource = stream.jid.resource.clone();
36 let iq = Iq::from_set(BIND_REQ_ID, Bind::new(resource));
37 let send = stream.send_stanza(iq);
38 ClientBind::WaitSend(send)
39 }
40 }
41 }
42}
43
44impl<S: AsyncRead + AsyncWrite> Future for ClientBind<S> {
45 type Item = XMPPStream<S>;
46 type Error = Error;
47
48 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
49 let state = replace(self, ClientBind::Invalid);
50
51 match state {
52 ClientBind::Unsupported(stream) => Ok(Async::Ready(stream)),
53 ClientBind::WaitSend(mut send) => match send.poll() {
54 Ok(Async::Ready(stream)) => {
55 replace(self, ClientBind::WaitRecv(stream));
56 self.poll()
57 }
58 Ok(Async::NotReady) => {
59 replace(self, ClientBind::WaitSend(send));
60 Ok(Async::NotReady)
61 }
62 Err(e) => Err(e)?,
63 },
64 ClientBind::WaitRecv(mut stream) => match stream.poll() {
65 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => match Iq::try_from(stanza) {
66 Ok(iq) => {
67 if iq.id == BIND_REQ_ID {
68 match iq.payload {
69 IqType::Result(payload) => {
70 payload
71 .and_then(|payload| Bind::try_from(payload).ok())
72 .map(|bind| match bind {
73 Bind::Jid(jid) => stream.jid = jid,
74 _ => {}
75 });
76 Ok(Async::Ready(stream))
77 }
78 _ => Err(ProtocolError::InvalidBindResponse)?,
79 }
80 } else {
81 Ok(Async::NotReady)
82 }
83 }
84 _ => Ok(Async::NotReady),
85 },
86 Ok(Async::Ready(_)) => {
87 replace(self, ClientBind::WaitRecv(stream));
88 self.poll()
89 }
90 Ok(Async::NotReady) => {
91 replace(self, ClientBind::WaitRecv(stream));
92 Ok(Async::NotReady)
93 }
94 Err(e) => Err(e)?,
95 },
96 ClientBind::Invalid => unreachable!(),
97 }
98 }
99}