1use std::mem::replace;
2// use std::error::Error as StdError;
3// use std::{fmt, io};
4use futures::{Future, Async, Poll, Stream, sink, Sink};
5use tokio_io::{AsyncRead, AsyncWrite};
6use tokio_codec::Framed;
7use jid::Jid;
8use minidom::Element;
9
10use xmpp_codec::{XMPPCodec, Packet};
11use xmpp_stream::XMPPStream;
12use {Error, ProtocolError};
13
14const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
15
16pub struct StreamStart<S: AsyncWrite> {
17 state: StreamStartState<S>,
18 jid: Jid,
19 ns: String,
20}
21
22enum StreamStartState<S: AsyncWrite> {
23 SendStart(sink::Send<Framed<S, XMPPCodec>>),
24 RecvStart(Framed<S, XMPPCodec>),
25 RecvFeatures(Framed<S, XMPPCodec>, String),
26 Invalid,
27}
28
29impl<S: AsyncWrite> StreamStart<S> {
30 pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid, ns: String) -> Self {
31 let attrs = [("to".to_owned(), jid.domain.clone()),
32 ("version".to_owned(), "1.0".to_owned()),
33 ("xmlns".to_owned(), ns.clone()),
34 ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
35 ].iter().cloned().collect();
36 let send = stream.send(Packet::StreamStart(attrs));
37
38 StreamStart {
39 state: StreamStartState::SendStart(send),
40 jid,
41 ns,
42 }
43 }
44}
45
46impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
47 type Item = XMPPStream<S>;
48 type Error = Error;
49
50 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
51 let old_state = replace(&mut self.state, StreamStartState::Invalid);
52 let mut retry = false;
53
54 let (new_state, result) = match old_state {
55 StreamStartState::SendStart(mut send) =>
56 match send.poll() {
57 Ok(Async::Ready(stream)) => {
58 retry = true;
59 (StreamStartState::RecvStart(stream), Ok(Async::NotReady))
60 },
61 Ok(Async::NotReady) =>
62 (StreamStartState::SendStart(send), Ok(Async::NotReady)),
63 Err(e) =>
64 (StreamStartState::Invalid, Err(e.into())),
65 },
66 StreamStartState::RecvStart(mut stream) =>
67 match stream.poll() {
68 Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
69 let stream_ns = match stream_attrs.get("xmlns") {
70 Some(ns) => ns.clone(),
71 None =>
72 return Err(ProtocolError::NoStreamNamespace.into()),
73 };
74 if self.ns == "jabber:client" {
75 retry = true;
76 // TODO: skip RecvFeatures for version < 1.0
77 (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
78 } else {
79 let id = match stream_attrs.get("id") {
80 Some(id) => id.clone(),
81 None =>
82 return Err(ProtocolError::NoStreamId.into()),
83 };
84 // FIXME: huge hack, shouldn’t be an element!
85 let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), Element::builder(id).build());
86 (StreamStartState::Invalid, Ok(Async::Ready(stream)))
87 }
88 },
89 Ok(Async::Ready(_)) =>
90 return Err(ProtocolError::InvalidToken.into()),
91 Ok(Async::NotReady) =>
92 (StreamStartState::RecvStart(stream), Ok(Async::NotReady)),
93 Err(e) =>
94 return Err(ProtocolError::from(e).into()),
95 },
96 StreamStartState::RecvFeatures(mut stream, stream_ns) =>
97 match stream.poll() {
98 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) =>
99 if stanza.is("features", NS_XMPP_STREAM) {
100 let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza);
101 (StreamStartState::Invalid, Ok(Async::Ready(stream)))
102 } else {
103 (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
104 },
105 Ok(Async::Ready(_)) | Ok(Async::NotReady) =>
106 (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)),
107 Err(e) =>
108 return Err(ProtocolError::from(e).into()),
109 },
110 StreamStartState::Invalid =>
111 unreachable!(),
112 };
113
114 self.state = new_state;
115 if retry {
116 self.poll()
117 } else {
118 result
119 }
120 }
121}
122
123// #[derive(Debug)]
124// pub enum StreamStartError {
125// MissingStreamNs,
126// MissingStreamId,
127// Unexpected,
128// Parser(ParserError),
129// IO(io::Error),
130// }
131
132// impl From<io::Error> for StreamStartError {
133// fn from(e: io::Error) -> Self {
134// StreamStartError::IO(e)
135// }
136// }
137
138// impl From<ParserError> for StreamStartError {
139// fn from(e: ParserError) -> Self {
140// match e {
141// ParserError::IO(e) => StreamStartError::IO(e),
142// _ => StreamStartError::Parser(e)
143// }
144// }
145// }
146
147// impl StdError for StreamStartError {
148// fn description(&self) -> &str {
149// match *self {
150// StreamStartError::MissingStreamNs => "Missing stream namespace",
151// StreamStartError::MissingStreamId => "Missing stream id",
152// StreamStartError::Unexpected => "Unexpected",
153// StreamStartError::Parser(ref pe) => pe.description(),
154// StreamStartError::IO(ref ie) => ie.description(),
155// }
156// }
157
158// fn cause(&self) -> Option<&StdError> {
159// match *self {
160// StreamStartError::Parser(ref pe) => pe.cause(),
161// StreamStartError::IO(ref ie) => ie.cause(),
162// _ => None,
163// }
164// }
165// }
166
167// impl fmt::Display for StreamStartError {
168// fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
169// match *self {
170// StreamStartError::MissingStreamNs => write!(f, "Missing stream namespace"),
171// StreamStartError::MissingStreamId => write!(f, "Missing stream id"),
172// StreamStartError::Unexpected => write!(f, "Received unexpected data"),
173// StreamStartError::Parser(ref pe) => write!(f, "{}", pe),
174// StreamStartError::IO(ref ie) => write!(f, "{}", ie),
175// }
176// }
177// }