1//! Components in XMPP are services/gateways that are logged into an
2//! XMPP server under a JID consisting of just a domain name. They are
3//! allowed to use any user and resource identifiers in their stanzas.
4use std::mem::replace;
5use std::str::FromStr;
6use std::error::Error;
7use tokio_core::reactor::Handle;
8use tokio_core::net::TcpStream;
9use tokio_io::{AsyncRead, AsyncWrite};
10use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
11use minidom::Element;
12use jid::{Jid, JidParseError};
13
14use super::xmpp_codec::Packet;
15use super::xmpp_stream;
16use super::happy_eyeballs::Connecter;
17use super::event::Event;
18
19mod auth;
20use self::auth::ComponentAuth;
21
22/// Component connection to an XMPP server
23pub struct Component {
24 /// The component's Jabber-Id
25 pub jid: Jid,
26 state: ComponentState,
27}
28
29type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
30const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
31
32enum ComponentState {
33 Invalid,
34 Disconnected,
35 Connecting(Box<Future<Item=XMPPStream, Error=String>>),
36 Connected(XMPPStream),
37}
38
39impl Component {
40 /// Start a new XMPP component
41 ///
42 /// Start polling the returned instance so that it will connect
43 /// and yield events.
44 pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result<Self, JidParseError> {
45 let jid = try!(Jid::from_str(jid));
46 let password = password.to_owned();
47 let connect = Self::make_connect(jid.clone(), password, server, port, handle);
48 Ok(Component {
49 jid,
50 state: ComponentState::Connecting(connect),
51 })
52 }
53
54 fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
55 let jid1 = jid.clone();
56 let password = password;
57 Box::new(
58 Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port)
59 .expect("Connector::from_lookup")
60 .and_then(move |tcp_stream| {
61 xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
62 .map_err(|e| format!("{}", e))
63 }).and_then(move |xmpp_stream| {
64 Self::auth(xmpp_stream, password).expect("auth")
65 }).and_then(|xmpp_stream| {
66 println!("Bound to {}", xmpp_stream.jid);
67 Ok(xmpp_stream)
68 })
69 )
70 }
71
72 fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, password: String) -> Result<ComponentAuth<S>, String> {
73 ComponentAuth::new(stream, password)
74 }
75}
76
77impl Stream for Component {
78 type Item = Event;
79 type Error = String;
80
81 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
82 let state = replace(&mut self.state, ComponentState::Invalid);
83
84 match state {
85 ComponentState::Invalid =>
86 Err("invalid client state".to_owned()),
87 ComponentState::Disconnected =>
88 Ok(Async::Ready(None)),
89 ComponentState::Connecting(mut connect) => {
90 match connect.poll() {
91 Ok(Async::Ready(stream)) => {
92 self.state = ComponentState::Connected(stream);
93 Ok(Async::Ready(Some(Event::Online)))
94 },
95 Ok(Async::NotReady) => {
96 self.state = ComponentState::Connecting(connect);
97 Ok(Async::NotReady)
98 },
99 Err(e) =>
100 Err(e),
101 }
102 },
103 ComponentState::Connected(mut stream) => {
104 // Poll sink
105 match stream.poll_complete() {
106 Ok(Async::NotReady) => (),
107 Ok(Async::Ready(())) => (),
108 Err(e) =>
109 return Err(e.description().to_owned()),
110 };
111
112 // Poll stream
113 match stream.poll() {
114 Ok(Async::NotReady) => {
115 self.state = ComponentState::Connected(stream);
116 Ok(Async::NotReady)
117 },
118 Ok(Async::Ready(None)) => {
119 // EOF
120 self.state = ComponentState::Disconnected;
121 Ok(Async::Ready(Some(Event::Disconnected)))
122 },
123 Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
124 self.state = ComponentState::Connected(stream);
125 Ok(Async::Ready(Some(Event::Stanza(stanza))))
126 },
127 Ok(Async::Ready(_)) => {
128 self.state = ComponentState::Connected(stream);
129 Ok(Async::NotReady)
130 },
131 Err(e) =>
132 Err(e.description().to_owned()),
133 }
134 },
135 }
136 }
137}
138
139impl Sink for Component {
140 type SinkItem = Element;
141 type SinkError = String;
142
143 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
144 match self.state {
145 ComponentState::Connected(ref mut stream) =>
146 match stream.start_send(Packet::Stanza(item)) {
147 Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
148 Ok(AsyncSink::NotReady(stanza)),
149 Ok(AsyncSink::NotReady(_)) =>
150 panic!("Component.start_send with stanza but got something else back"),
151 Ok(AsyncSink::Ready) => {
152 Ok(AsyncSink::Ready)
153 },
154 Err(e) =>
155 Err(e.description().to_owned()),
156 },
157 _ =>
158 Ok(AsyncSink::NotReady(item)),
159 }
160 }
161
162 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
163 match &mut self.state {
164 &mut ComponentState::Connected(ref mut stream) =>
165 stream.poll_complete()
166 .map_err(|e| e.description().to_owned()),
167 _ =>
168 Ok(Async::Ready(())),
169 }
170 }
171}