1//! Components in XMPP are services/gateways that are logged into an
2//! XMPP server under a JID consisting of just a domain name. They are
3//! allowed to use any user and resource identifiers in their stanzas.
4use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
5use xmpp_parsers::{Jid, JidParseError, Element};
6use std::mem::replace;
7use std::str::FromStr;
8use tokio::net::TcpStream;
9use tokio_io::{AsyncRead, AsyncWrite};
10
11use super::event::Event;
12use super::happy_eyeballs::Connecter;
13use super::xmpp_codec::Packet;
14use super::xmpp_stream;
15use super::Error;
16
17mod auth;
18use self::auth::ComponentAuth;
19
20/// Component connection to an XMPP server
21pub struct Component {
22 /// The component's Jabber-Id
23 pub jid: Jid,
24 state: ComponentState,
25}
26
27type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
28const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
29
30enum ComponentState {
31 Invalid,
32 Disconnected,
33 Connecting(Box<dyn Future<Item = XMPPStream, Error = Error>>),
34 Connected(XMPPStream),
35}
36
37impl Component {
38 /// Start a new XMPP component
39 ///
40 /// Start polling the returned instance so that it will connect
41 /// and yield events.
42 pub fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, JidParseError> {
43 let jid = Jid::from_str(jid)?;
44 let password = password.to_owned();
45 let connect = Self::make_connect(jid.clone(), password, server, port);
46 Ok(Component {
47 jid,
48 state: ComponentState::Connecting(Box::new(connect)),
49 })
50 }
51
52 fn make_connect(
53 jid: Jid,
54 password: String,
55 server: &str,
56 port: u16,
57 ) -> impl Future<Item = XMPPStream, Error = Error> {
58 let jid1 = jid.clone();
59 let password = password;
60 done(Connecter::from_lookup(server, None, port))
61 .flatten()
62 .and_then(move |tcp_stream| {
63 xmpp_stream::XMPPStream::start(
64 tcp_stream,
65 jid1,
66 NS_JABBER_COMPONENT_ACCEPT.to_owned(),
67 )
68 })
69 .and_then(move |xmpp_stream| Self::auth(xmpp_stream, password).expect("auth"))
70 }
71
72 fn auth<S: AsyncRead + AsyncWrite>(
73 stream: xmpp_stream::XMPPStream<S>,
74 password: String,
75 ) -> Result<ComponentAuth<S>, Error> {
76 ComponentAuth::new(stream, password)
77 }
78}
79
80impl Stream for Component {
81 type Item = Event;
82 type Error = Error;
83
84 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
85 let state = replace(&mut self.state, ComponentState::Invalid);
86
87 match state {
88 ComponentState::Invalid => Err(Error::InvalidState),
89 ComponentState::Disconnected => Ok(Async::Ready(None)),
90 ComponentState::Connecting(mut connect) => match connect.poll() {
91 Ok(Async::Ready(stream)) => {
92 self.state = ComponentState::Connected(stream);
93 Ok(Async::Ready(Some(Event::Online(self.jid.clone()))))
94 }
95 Ok(Async::NotReady) => {
96 self.state = ComponentState::Connecting(connect);
97 Ok(Async::NotReady)
98 }
99 Err(e) => Err(e),
100 },
101 ComponentState::Connected(mut stream) => {
102 // Poll sink
103 match stream.poll_complete() {
104 Ok(Async::NotReady) => (),
105 Ok(Async::Ready(())) => (),
106 Err(e) => return Err(e)?,
107 };
108
109 // Poll stream
110 match stream.poll() {
111 Ok(Async::NotReady) => {
112 self.state = ComponentState::Connected(stream);
113 Ok(Async::NotReady)
114 }
115 Ok(Async::Ready(None)) => {
116 // EOF
117 self.state = ComponentState::Disconnected;
118 Ok(Async::Ready(Some(Event::Disconnected)))
119 }
120 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
121 self.state = ComponentState::Connected(stream);
122 Ok(Async::Ready(Some(Event::Stanza(stanza))))
123 }
124 Ok(Async::Ready(_)) => {
125 self.state = ComponentState::Connected(stream);
126 Ok(Async::NotReady)
127 }
128 Err(e) => Err(e)?,
129 }
130 }
131 }
132 }
133}
134
135impl Sink for Component {
136 type SinkItem = Element;
137 type SinkError = Error;
138
139 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
140 match self.state {
141 ComponentState::Connected(ref mut stream) => match stream
142 .start_send(Packet::Stanza(item))
143 {
144 Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)),
145 Ok(AsyncSink::NotReady(_)) => {
146 panic!("Component.start_send with stanza but got something else back")
147 }
148 Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
149 Err(e) => Err(e)?,
150 },
151 _ => Ok(AsyncSink::NotReady(item)),
152 }
153 }
154
155 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
156 match &mut self.state {
157 &mut ComponentState::Connected(ref mut stream) => {
158 stream.poll_complete().map_err(|e| e.into())
159 }
160 _ => Ok(Async::Ready(())),
161 }
162 }
163}