1//! Components in XMPP are services/gateways that are logged into an
2//! XMPP server under a JID consisting of just a domain name. They are
3//! allowed to use any user and resource identifiers in their stanzas.
4use std::mem::replace;
5use std::str::FromStr;
6use std::error::Error as StdError;
7use tokio_core::reactor::Handle;
8use tokio_core::net::TcpStream;
9use tokio_io::{AsyncRead, AsyncWrite};
10use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink, done};
11use minidom::Element;
12use jid::{Jid, JidParseError};
13
14use super::xmpp_codec::Packet;
15use super::xmpp_stream;
16use super::happy_eyeballs::Connecter;
17use super::event::Event;
18use super::Error;
19
20mod auth;
21use self::auth::ComponentAuth;
22
23/// Component connection to an XMPP server
24pub struct Component {
25 /// The component's Jabber-Id
26 pub jid: Jid,
27 state: ComponentState,
28}
29
30type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
31const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
32
33enum ComponentState {
34 Invalid,
35 Disconnected,
36 Connecting(Box<Future<Item=XMPPStream, Error=Error>>),
37 Connected(XMPPStream),
38}
39
40impl Component {
41 /// Start a new XMPP component
42 ///
43 /// Start polling the returned instance so that it will connect
44 /// and yield events.
45 pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result<Self, JidParseError> {
46 let jid = Jid::from_str(jid)?;
47 let password = password.to_owned();
48 let connect = Self::make_connect(jid.clone(), password, server, port, handle);
49 Ok(Component {
50 jid,
51 state: ComponentState::Connecting(Box::new(connect)),
52 })
53 }
54
55 fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> impl Future<Item=XMPPStream, Error=Error> {
56 let jid1 = jid.clone();
57 let password = password;
58 done(Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port))
59 .map_err(Error::Domain)
60 .and_then(|connecter| connecter
61 .map_err(Error::Connection)
62 ).and_then(move |tcp_stream| {
63 xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
64 }).and_then(move |xmpp_stream| {
65 Self::auth(xmpp_stream, password).expect("auth")
66 })
67 }
68
69 fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, password: String) -> Result<ComponentAuth<S>, Error> {
70 ComponentAuth::new(stream, password)
71 }
72}
73
74impl Stream for Component {
75 type Item = Event;
76 type Error = Error;
77
78 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
79 let state = replace(&mut self.state, ComponentState::Invalid);
80
81 match state {
82 ComponentState::Invalid =>
83 Err(Error::InvalidState),
84 ComponentState::Disconnected =>
85 Ok(Async::Ready(None)),
86 ComponentState::Connecting(mut connect) => {
87 match connect.poll() {
88 Ok(Async::Ready(stream)) => {
89 self.state = ComponentState::Connected(stream);
90 Ok(Async::Ready(Some(Event::Online)))
91 },
92 Ok(Async::NotReady) => {
93 self.state = ComponentState::Connecting(connect);
94 Ok(Async::NotReady)
95 },
96 Err(e) =>
97 Err(e),
98 }
99 },
100 ComponentState::Connected(mut stream) => {
101 // Poll sink
102 match stream.poll_complete() {
103 Ok(Async::NotReady) => (),
104 Ok(Async::Ready(())) => (),
105 Err(e) =>
106 return Err(e.into()),
107 };
108
109 // Poll stream
110 match stream.poll() {
111 Ok(Async::NotReady) => {
112 self.state = ComponentState::Connected(stream);
113 Ok(Async::NotReady)
114 },
115 Ok(Async::Ready(None)) => {
116 // EOF
117 self.state = ComponentState::Disconnected;
118 Ok(Async::Ready(Some(Event::Disconnected)))
119 },
120 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
121 self.state = ComponentState::Connected(stream);
122 Ok(Async::Ready(Some(Event::Stanza(stanza))))
123 },
124 Ok(Async::Ready(_)) => {
125 self.state = ComponentState::Connected(stream);
126 Ok(Async::NotReady)
127 },
128 Err(e) =>
129 Err(e.into()),
130 }
131 },
132 }
133 }
134}
135
136impl Sink for Component {
137 type SinkItem = Element;
138 type SinkError = String;
139
140 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
141 match self.state {
142 ComponentState::Connected(ref mut stream) =>
143 match stream.start_send(Packet::Stanza(item)) {
144 Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
145 Ok(AsyncSink::NotReady(stanza)),
146 Ok(AsyncSink::NotReady(_)) =>
147 panic!("Component.start_send with stanza but got something else back"),
148 Ok(AsyncSink::Ready) => {
149 Ok(AsyncSink::Ready)
150 },
151 Err(e) =>
152 Err(e.description().to_owned()),
153 },
154 _ =>
155 Ok(AsyncSink::NotReady(item)),
156 }
157 }
158
159 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
160 match &mut self.state {
161 &mut ComponentState::Connected(ref mut stream) =>
162 stream.poll_complete()
163 .map_err(|e| e.description().to_owned()),
164 _ =>
165 Ok(Async::Ready(())),
166 }
167 }
168}