stream_start.rs

  1use std::mem::replace;
  2// use std::error::Error as StdError;
  3// use std::{fmt, io};
  4use futures::{Future, Async, Poll, Stream, sink, Sink};
  5use tokio_io::{AsyncRead, AsyncWrite};
  6use tokio_codec::Framed;
  7use jid::Jid;
  8use minidom::Element;
  9
 10use xmpp_codec::{XMPPCodec, Packet};
 11use xmpp_stream::XMPPStream;
 12use {Error, ProtocolError};
 13
 14const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
 15
 16pub struct StreamStart<S: AsyncWrite> {
 17    state: StreamStartState<S>,
 18    jid: Jid,
 19    ns: String,
 20}
 21
 22enum StreamStartState<S: AsyncWrite> {
 23    SendStart(sink::Send<Framed<S, XMPPCodec>>),
 24    RecvStart(Framed<S, XMPPCodec>),
 25    RecvFeatures(Framed<S, XMPPCodec>, String),
 26    Invalid,
 27}
 28
 29impl<S: AsyncWrite> StreamStart<S> {
 30    pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid, ns: String) -> Self {
 31        let attrs = [("to".to_owned(), jid.domain.clone()),
 32                     ("version".to_owned(), "1.0".to_owned()),
 33                     ("xmlns".to_owned(), ns.clone()),
 34                     ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
 35        ].iter().cloned().collect();
 36        let send = stream.send(Packet::StreamStart(attrs));
 37
 38        StreamStart {
 39            state: StreamStartState::SendStart(send),
 40            jid,
 41            ns,
 42        }
 43    }
 44}
 45
 46impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
 47    type Item = XMPPStream<S>;
 48    type Error = Error;
 49
 50    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 51        let old_state = replace(&mut self.state, StreamStartState::Invalid);
 52        let mut retry = false;
 53
 54        let (new_state, result) = match old_state {
 55            StreamStartState::SendStart(mut send) =>
 56                match send.poll() {
 57                    Ok(Async::Ready(stream)) => {
 58                        retry = true;
 59                        (StreamStartState::RecvStart(stream), Ok(Async::NotReady))
 60                    },
 61                    Ok(Async::NotReady) =>
 62                        (StreamStartState::SendStart(send), Ok(Async::NotReady)),
 63                    Err(e) =>
 64                        (StreamStartState::Invalid, Err(e.into())),
 65                },
 66            StreamStartState::RecvStart(mut stream) =>
 67                match stream.poll() {
 68                    Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
 69                        let stream_ns = match stream_attrs.get("xmlns") {
 70                            Some(ns) => ns.clone(),
 71                            None =>
 72                                return Err(ProtocolError::NoStreamNamespace.into()),
 73                        };
 74                        if self.ns == "jabber:client" {
 75                            retry = true;
 76                            // TODO: skip RecvFeatures for version < 1.0
 77                            (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
 78                        } else {
 79                            let id = match stream_attrs.get("id") {
 80                                Some(id) => id.clone(),
 81                                None =>
 82                                    return Err(ProtocolError::NoStreamId.into()),
 83                            };
 84                                                                                                    // FIXME: huge hack, shouldn’t be an element!
 85                            let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), Element::builder(id).build());
 86                            (StreamStartState::Invalid, Ok(Async::Ready(stream)))
 87                        }
 88                    },
 89                    Ok(Async::Ready(_)) =>
 90                        return Err(ProtocolError::InvalidToken.into()),
 91                    Ok(Async::NotReady) =>
 92                        (StreamStartState::RecvStart(stream), Ok(Async::NotReady)),
 93                    Err(e) =>
 94                        return Err(ProtocolError::from(e).into()),
 95                },
 96            StreamStartState::RecvFeatures(mut stream, stream_ns) =>
 97                match stream.poll() {
 98                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) =>
 99                        if stanza.is("features", NS_XMPP_STREAM) {
100                            let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza);
101                            (StreamStartState::Invalid, Ok(Async::Ready(stream)))
102                        } else {
103                            (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
104                        },
105                    Ok(Async::Ready(_)) | Ok(Async::NotReady) =>
106                        (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)),
107                    Err(e) =>
108                        return Err(ProtocolError::from(e).into()),
109                },
110            StreamStartState::Invalid =>
111                unreachable!(),
112        };
113
114        self.state = new_state;
115        if retry {
116            self.poll()
117        } else {
118            result
119        }
120    }
121}
122
123// #[derive(Debug)]
124// pub enum StreamStartError {
125//     MissingStreamNs,
126//     MissingStreamId,
127//     Unexpected,
128//     Parser(ParserError),
129//     IO(io::Error),
130// }
131
132// impl From<io::Error> for StreamStartError {
133//     fn from(e: io::Error) -> Self {
134//         StreamStartError::IO(e)
135//     }
136// }
137
138// impl From<ParserError> for StreamStartError {
139//     fn from(e: ParserError) -> Self {
140//         match e {
141//             ParserError::IO(e) => StreamStartError::IO(e),
142//             _ => StreamStartError::Parser(e)
143//         }
144//     }
145// }
146
147// impl StdError for StreamStartError {
148//     fn description(&self) -> &str {
149//         match *self {
150//             StreamStartError::MissingStreamNs => "Missing stream namespace",
151//             StreamStartError::MissingStreamId => "Missing stream id",
152//             StreamStartError::Unexpected => "Unexpected",
153//             StreamStartError::Parser(ref pe) => pe.description(),
154//             StreamStartError::IO(ref ie) => ie.description(),
155//         }
156//     }
157
158//     fn cause(&self) -> Option<&StdError> {
159//         match *self {
160//             StreamStartError::Parser(ref pe) => pe.cause(),
161//             StreamStartError::IO(ref ie) => ie.cause(),
162//             _ => None,
163//         }
164//     }
165// }
166
167// impl fmt::Display for StreamStartError {
168//     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
169//         match *self {
170//             StreamStartError::MissingStreamNs => write!(f, "Missing stream namespace"),
171//             StreamStartError::MissingStreamId => write!(f, "Missing stream id"),
172//             StreamStartError::Unexpected => write!(f, "Received unexpected data"),
173//             StreamStartError::Parser(ref pe) => write!(f, "{}", pe),
174//             StreamStartError::IO(ref ie) => write!(f, "{}", ie),
175//         }
176//     }
177// }