1//! Components in XMPP are services/gateways that are logged into an
2//! XMPP server under a JID consisting of just a domain name. They are
3//! allowed to use any user and resource identifiers in their stanzas.
4use std::mem::replace;
5use std::str::FromStr;
6use std::error::Error;
7use tokio::net::TcpStream;
8use tokio_io::{AsyncRead, AsyncWrite};
9use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
10use minidom::Element;
11use jid::{Jid, JidParseError};
12
13use super::xmpp_codec::Packet;
14use super::xmpp_stream;
15use super::happy_eyeballs::Connecter;
16use super::event::Event;
17
18mod auth;
19use self::auth::ComponentAuth;
20
21/// Component connection to an XMPP server
22pub struct Component {
23 /// The component's Jabber-Id
24 pub jid: Jid,
25 state: ComponentState,
26}
27
28type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
29const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
30
31enum ComponentState {
32 Invalid,
33 Disconnected,
34 Connecting(Box<Future<Item=XMPPStream, Error=String>>),
35 Connected(XMPPStream),
36}
37
38impl Component {
39 /// Start a new XMPP component
40 ///
41 /// Start polling the returned instance so that it will connect
42 /// and yield events.
43 pub fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, JidParseError> {
44 let jid = Jid::from_str(jid)?;
45 let password = password.to_owned();
46 let connect = Self::make_connect(jid.clone(), password, server, port);
47 Ok(Component {
48 jid,
49 state: ComponentState::Connecting(connect),
50 })
51 }
52
53 fn make_connect(jid: Jid, password: String, server: &str, port: u16) -> Box<Future<Item=XMPPStream, Error=String>> {
54 let jid1 = jid.clone();
55 let password = password;
56 Box::new(
57 Connecter::from_lookup(server, "_xmpp-component._tcp", port)
58 .expect("Connector::from_lookup")
59 .and_then(move |tcp_stream| {
60 xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
61 .map_err(|e| format!("{}", e))
62 }).and_then(move |xmpp_stream| {
63 Self::auth(xmpp_stream, password).expect("auth")
64 }).and_then(|xmpp_stream| {
65 // println!("Bound to {}", xmpp_stream.jid);
66 Ok(xmpp_stream)
67 })
68 )
69 }
70
71 fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, password: String) -> Result<ComponentAuth<S>, String> {
72 ComponentAuth::new(stream, password)
73 }
74}
75
76impl Stream for Component {
77 type Item = Event;
78 type Error = String;
79
80 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
81 let state = replace(&mut self.state, ComponentState::Invalid);
82
83 match state {
84 ComponentState::Invalid =>
85 Err("invalid client state".to_owned()),
86 ComponentState::Disconnected =>
87 Ok(Async::Ready(None)),
88 ComponentState::Connecting(mut connect) => {
89 match connect.poll() {
90 Ok(Async::Ready(stream)) => {
91 self.state = ComponentState::Connected(stream);
92 Ok(Async::Ready(Some(Event::Online)))
93 },
94 Ok(Async::NotReady) => {
95 self.state = ComponentState::Connecting(connect);
96 Ok(Async::NotReady)
97 },
98 Err(e) =>
99 Err(e),
100 }
101 },
102 ComponentState::Connected(mut stream) => {
103 // Poll sink
104 match stream.poll_complete() {
105 Ok(Async::NotReady) => (),
106 Ok(Async::Ready(())) => (),
107 Err(e) =>
108 return Err(e.description().to_owned()),
109 };
110
111 // Poll stream
112 match stream.poll() {
113 Ok(Async::NotReady) => {
114 self.state = ComponentState::Connected(stream);
115 Ok(Async::NotReady)
116 },
117 Ok(Async::Ready(None)) => {
118 // EOF
119 self.state = ComponentState::Disconnected;
120 Ok(Async::Ready(Some(Event::Disconnected)))
121 },
122 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
123 self.state = ComponentState::Connected(stream);
124 Ok(Async::Ready(Some(Event::Stanza(stanza))))
125 },
126 Ok(Async::Ready(_)) => {
127 self.state = ComponentState::Connected(stream);
128 Ok(Async::NotReady)
129 },
130 Err(e) =>
131 Err(e.description().to_owned()),
132 }
133 },
134 }
135 }
136}
137
138impl Sink for Component {
139 type SinkItem = Element;
140 type SinkError = String;
141
142 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
143 match self.state {
144 ComponentState::Connected(ref mut stream) =>
145 match stream.start_send(Packet::Stanza(item)) {
146 Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
147 Ok(AsyncSink::NotReady(stanza)),
148 Ok(AsyncSink::NotReady(_)) =>
149 panic!("Component.start_send with stanza but got something else back"),
150 Ok(AsyncSink::Ready) => {
151 Ok(AsyncSink::Ready)
152 },
153 Err(e) =>
154 Err(e.description().to_owned()),
155 },
156 _ =>
157 Ok(AsyncSink::NotReady(item)),
158 }
159 }
160
161 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
162 match &mut self.state {
163 &mut ComponentState::Connected(ref mut stream) =>
164 stream.poll_complete()
165 .map_err(|e| e.description().to_owned()),
166 _ =>
167 Ok(Async::Ready(())),
168 }
169 }
170}