1use futures::{sink, Async, Future, Poll, Stream};
2use std::convert::TryFrom;
3use std::mem::replace;
4use tokio_io::{AsyncRead, AsyncWrite};
5use xmpp_parsers::bind::{BindQuery, BindResponse};
6use xmpp_parsers::iq::{Iq, IqType};
7use xmpp_parsers::Jid;
8
9use crate::xmpp_codec::Packet;
10use crate::xmpp_stream::XMPPStream;
11use crate::{Error, ProtocolError};
12
13const NS_XMPP_BIND: &str = "urn:ietf:params:xml:ns:xmpp-bind";
14const BIND_REQ_ID: &str = "resource-bind";
15
16pub enum ClientBind<S: AsyncWrite> {
17 Unsupported(XMPPStream<S>),
18 WaitSend(sink::Send<XMPPStream<S>>),
19 WaitRecv(XMPPStream<S>),
20 Invalid,
21}
22
23impl<S: AsyncWrite> ClientBind<S> {
24 /// Consumes and returns the stream to express that you cannot use
25 /// the stream for anything else until the resource binding
26 /// req/resp are done.
27 pub fn new(stream: XMPPStream<S>) -> Self {
28 match stream.stream_features.get_child("bind", NS_XMPP_BIND) {
29 None =>
30 // No resource binding available,
31 // return the (probably // usable) stream immediately
32 {
33 ClientBind::Unsupported(stream)
34 }
35 Some(_) => {
36 let resource;
37 if let Jid::Full(jid) = stream.jid.clone() {
38 resource = Some(jid.resource);
39 } else {
40 resource = None;
41 }
42 let iq = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
43 let send = stream.send_stanza(iq);
44 ClientBind::WaitSend(send)
45 }
46 }
47 }
48}
49
50impl<S: AsyncRead + AsyncWrite> Future for ClientBind<S> {
51 type Item = XMPPStream<S>;
52 type Error = Error;
53
54 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
55 let state = replace(self, ClientBind::Invalid);
56
57 match state {
58 ClientBind::Unsupported(stream) => Ok(Async::Ready(stream)),
59 ClientBind::WaitSend(mut send) => match send.poll() {
60 Ok(Async::Ready(stream)) => {
61 replace(self, ClientBind::WaitRecv(stream));
62 self.poll()
63 }
64 Ok(Async::NotReady) => {
65 replace(self, ClientBind::WaitSend(send));
66 Ok(Async::NotReady)
67 }
68 Err(e) => Err(e)?,
69 },
70 ClientBind::WaitRecv(mut stream) => match stream.poll() {
71 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => match Iq::try_from(stanza) {
72 Ok(iq) => {
73 if iq.id == BIND_REQ_ID {
74 match iq.payload {
75 IqType::Result(payload) => {
76 payload
77 .and_then(|payload| BindResponse::try_from(payload).ok())
78 .map(|bind| stream.jid = bind.into());
79 Ok(Async::Ready(stream))
80 }
81 _ => Err(ProtocolError::InvalidBindResponse)?,
82 }
83 } else {
84 Ok(Async::NotReady)
85 }
86 }
87 _ => Ok(Async::NotReady),
88 },
89 Ok(Async::Ready(_)) => {
90 replace(self, ClientBind::WaitRecv(stream));
91 self.poll()
92 }
93 Ok(Async::NotReady) => {
94 replace(self, ClientBind::WaitRecv(stream));
95 Ok(Async::NotReady)
96 }
97 Err(e) => Err(e)?,
98 },
99 ClientBind::Invalid => unreachable!(),
100 }
101 }
102}