1use futures::{sink, Async, Future, Poll, Sink, Stream};
2use jid::Jid;
3use minidom::Element;
4use std::mem::replace;
5use tokio_codec::Framed;
6use tokio_io::{AsyncRead, AsyncWrite};
7
8use crate::xmpp_codec::{Packet, XMPPCodec};
9use crate::xmpp_stream::XMPPStream;
10use crate::{Error, ProtocolError};
11
12const NS_XMPP_STREAM: &str = "http://etherx.jabber.org/streams";
13
14pub struct StreamStart<S: AsyncWrite> {
15 state: StreamStartState<S>,
16 jid: Jid,
17 ns: String,
18}
19
20enum StreamStartState<S: AsyncWrite> {
21 SendStart(sink::Send<Framed<S, XMPPCodec>>),
22 RecvStart(Framed<S, XMPPCodec>),
23 RecvFeatures(Framed<S, XMPPCodec>, String),
24 Invalid,
25}
26
27impl<S: AsyncWrite> StreamStart<S> {
28 pub fn from_stream(stream: Framed<S, XMPPCodec>, jid: Jid, ns: String) -> Self {
29 let attrs = [
30 ("to".to_owned(), jid.domain.clone()),
31 ("version".to_owned(), "1.0".to_owned()),
32 ("xmlns".to_owned(), ns.clone()),
33 ("xmlns:stream".to_owned(), NS_XMPP_STREAM.to_owned()),
34 ]
35 .iter()
36 .cloned()
37 .collect();
38 let send = stream.send(Packet::StreamStart(attrs));
39
40 StreamStart {
41 state: StreamStartState::SendStart(send),
42 jid,
43 ns,
44 }
45 }
46}
47
48impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
49 type Item = XMPPStream<S>;
50 type Error = Error;
51
52 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
53 let old_state = replace(&mut self.state, StreamStartState::Invalid);
54 let mut retry = false;
55
56 let (new_state, result) = match old_state {
57 StreamStartState::SendStart(mut send) => match send.poll() {
58 Ok(Async::Ready(stream)) => {
59 retry = true;
60 (StreamStartState::RecvStart(stream), Ok(Async::NotReady))
61 }
62 Ok(Async::NotReady) => (StreamStartState::SendStart(send), Ok(Async::NotReady)),
63 Err(e) => (StreamStartState::Invalid, Err(e.into())),
64 },
65 StreamStartState::RecvStart(mut stream) => match stream.poll() {
66 Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
67 let stream_ns = stream_attrs
68 .get("xmlns")
69 .ok_or(ProtocolError::NoStreamNamespace)?
70 .clone();
71 if self.ns == "jabber:client" {
72 retry = true;
73 // TODO: skip RecvFeatures for version < 1.0
74 (
75 StreamStartState::RecvFeatures(stream, stream_ns),
76 Ok(Async::NotReady),
77 )
78 } else {
79 let id = stream_attrs
80 .get("id")
81 .ok_or(ProtocolError::NoStreamId)?
82 .clone();
83 // FIXME: huge hack, shouldn’t be an element!
84 let stream = XMPPStream::new(
85 self.jid.clone(),
86 stream,
87 self.ns.clone(),
88 Element::builder(id).build(),
89 );
90 (StreamStartState::Invalid, Ok(Async::Ready(stream)))
91 }
92 }
93 Ok(Async::Ready(_)) => return Err(ProtocolError::InvalidToken.into()),
94 Ok(Async::NotReady) => (StreamStartState::RecvStart(stream), Ok(Async::NotReady)),
95 Err(e) => return Err(ProtocolError::from(e).into()),
96 },
97 StreamStartState::RecvFeatures(mut stream, stream_ns) => match stream.poll() {
98 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
99 if stanza.is("features", NS_XMPP_STREAM) {
100 let stream =
101 XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), stanza);
102 (StreamStartState::Invalid, Ok(Async::Ready(stream)))
103 } else {
104 (
105 StreamStartState::RecvFeatures(stream, stream_ns),
106 Ok(Async::NotReady),
107 )
108 }
109 }
110 Ok(Async::Ready(_)) | Ok(Async::NotReady) => (
111 StreamStartState::RecvFeatures(stream, stream_ns),
112 Ok(Async::NotReady),
113 ),
114 Err(e) => return Err(ProtocolError::from(e).into()),
115 },
116 StreamStartState::Invalid => unreachable!(),
117 };
118
119 self.state = new_state;
120 if retry {
121 self.poll()
122 } else {
123 result
124 }
125 }
126}