stream_start.rs

  1use futures::{sink, Async, Future, Poll, Sink, Stream};
  2use jid::Jid;
  3use minidom::Element;
  4use std::mem::replace;
  5use tokio_codec::Framed;
  6use tokio_io::{AsyncRead, AsyncWrite};
  7
  8use crate::xmpp_codec::{Packet, XMPPCodec};
  9use crate::xmpp_stream::XMPPStream;
 10use crate::{Error, ProtocolError};
 11
 12const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
 13
 14pub struct StreamStart<S: AsyncWrite> {
 15    state: StreamStartState<S>,
 16    jid: Jid,
 17    ns: String,
 18}
 19
 20enum StreamStartState<S: AsyncWrite> {
 21    SendStart(sink::Send<Framed<S, XMPPCodec>>),
 22    RecvStart(Framed<S, XMPPCodec>),
 23    RecvFeatures(Framed<S, XMPPCodec>, String),
 24    Invalid,
 25}
 26
 27impl<S: AsyncWrite> StreamStart<S> {
 28    pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid, ns: String) -> Self {
 29        let attrs = [
 30            ("to".to_owned(), jid.domain.clone()),
 31            ("version".to_owned(), "1.0".to_owned()),
 32            ("xmlns".to_owned(), ns.clone()),
 33            ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
 34        ]
 35        .iter()
 36        .cloned()
 37        .collect();
 38        let send = stream.send(Packet::StreamStart(attrs));
 39
 40        StreamStart {
 41            state: StreamStartState::SendStart(send),
 42            jid,
 43            ns,
 44        }
 45    }
 46}
 47
 48impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
 49    type Item = XMPPStream<S>;
 50    type Error = Error;
 51
 52    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 53        let old_state = replace(&mut self.state, StreamStartState::Invalid);
 54        let mut retry = false;
 55
 56        let (new_state, result) = match old_state {
 57            StreamStartState::SendStart(mut send) => match send.poll() {
 58                Ok(Async::Ready(stream)) => {
 59                    retry = true;
 60                    (StreamStartState::RecvStart(stream), Ok(Async::NotReady))
 61                }
 62                Ok(Async::NotReady) => (StreamStartState::SendStart(send), Ok(Async::NotReady)),
 63                Err(e) => (StreamStartState::Invalid, Err(e.into())),
 64            },
 65            StreamStartState::RecvStart(mut stream) => match stream.poll() {
 66                Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
 67                    let stream_ns = stream_attrs
 68                        .get("xmlns")
 69                        .ok_or(ProtocolError::NoStreamNamespace)?
 70                        .clone();
 71                    if self.ns == "jabber:client" {
 72                        retry = true;
 73                        // TODO: skip RecvFeatures for version < 1.0
 74                        (
 75                            StreamStartState::RecvFeatures(stream, stream_ns),
 76                            Ok(Async::NotReady),
 77                        )
 78                    } else {
 79                        let id = stream_attrs
 80                            .get("id")
 81                            .ok_or(ProtocolError::NoStreamId)?
 82                            .clone();
 83                        // FIXME: huge hack, shouldn’t be an element!
 84                        let stream = XMPPStream::new(
 85                            self.jid.clone(),
 86                            stream,
 87                            self.ns.clone(),
 88                            Element::builder(id).build(),
 89                        );
 90                        (StreamStartState::Invalid, Ok(Async::Ready(stream)))
 91                    }
 92                }
 93                Ok(Async::Ready(_)) => return Err(ProtocolError::InvalidToken.into()),
 94                Ok(Async::NotReady) => (StreamStartState::RecvStart(stream), Ok(Async::NotReady)),
 95                Err(e) => return Err(ProtocolError::from(e).into()),
 96            },
 97            StreamStartState::RecvFeatures(mut stream, stream_ns) => match stream.poll() {
 98                Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
 99                    if stanza.is("features", NS_XMPP_STREAM) {
100                        let stream =
101                            XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza);
102                        (StreamStartState::Invalid, Ok(Async::Ready(stream)))
103                    } else {
104                        (
105                            StreamStartState::RecvFeatures(stream, stream_ns),
106                            Ok(Async::NotReady),
107                        )
108                    }
109                }
110                Ok(Async::Ready(_)) | Ok(Async::NotReady) => (
111                    StreamStartState::RecvFeatures(stream, stream_ns),
112                    Ok(Async::NotReady),
113                ),
114                Err(e) => return Err(ProtocolError::from(e).into()),
115            },
116            StreamStartState::Invalid => unreachable!(),
117        };
118
119        self.state = new_state;
120        if retry {
121            self.poll()
122        } else {
123            result
124        }
125    }
126}