bind.rs

 1use futures::stream::StreamExt;
 2use tokio::io::{AsyncRead, AsyncWrite};
 3use xmpp_parsers::bind::{BindQuery, BindResponse};
 4use xmpp_parsers::iq::{Iq, IqType};
 5
 6use crate::xmpp_codec::Packet;
 7use crate::xmpp_stream::XMPPStream;
 8use crate::{Error, ProtocolError};
 9
10const BIND_REQ_ID: &str = "resource-bind";
11
12pub async fn bind<S: AsyncRead + AsyncWrite + Unpin>(
13    mut stream: XMPPStream<S>,
14) -> Result<XMPPStream<S>, Error> {
15    if stream.stream_features.can_bind() {
16        let resource = stream
17            .jid
18            .resource()
19            .and_then(|resource| Some(resource.to_string()));
20        let iq = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource));
21        stream.send_stanza(iq).await?;
22
23        loop {
24            match stream.next().await {
25                Some(Ok(Packet::Stanza(stanza))) => match Iq::try_from(stanza) {
26                    Ok(iq) if iq.id == BIND_REQ_ID => match iq.payload {
27                        IqType::Result(payload) => {
28                            payload
29                                .and_then(|payload| BindResponse::try_from(payload).ok())
30                                .map(|bind| stream.jid = bind.into());
31                            return Ok(stream);
32                        }
33                        _ => return Err(ProtocolError::InvalidBindResponse.into()),
34                    },
35                    _ => {}
36                },
37                Some(Ok(_)) => {}
38                Some(Err(e)) => return Err(e),
39                None => return Err(Error::Disconnected),
40            }
41        }
42    } else {
43        // No resource binding available,
44        // return the (probably // usable) stream immediately
45        return Ok(stream);
46    }
47}