mod.rs

  1//! Components in XMPP are services/gateways that are logged into an
  2//! XMPP server under a JID consisting of just a domain name. They are
  3//! allowed to use any user and resource identifiers in their stanzas.
  4use std::mem::replace;
  5use std::str::FromStr;
  6use std::error::Error;
  7use tokio_core::reactor::Handle;
  8use tokio_core::net::TcpStream;
  9use tokio_io::{AsyncRead, AsyncWrite};
 10use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
 11use minidom::Element;
 12use jid::{Jid, JidParseError};
 13
 14use super::xmpp_codec::Packet;
 15use super::xmpp_stream;
 16use super::happy_eyeballs::Connecter;
 17use super::event::Event;
 18
 19mod auth;
 20use self::auth::ComponentAuth;
 21
 22/// Component connection to an XMPP server
 23pub struct Component {
 24    /// The component's Jabber-Id
 25    pub jid: Jid,
 26    state: ComponentState,
 27}
 28
 29type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
 30const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
 31
 32enum ComponentState {
 33    Invalid,
 34    Disconnected,
 35    Connecting(Box<Future<Item=XMPPStream, Error=String>>),
 36    Connected(XMPPStream),
 37}
 38
 39impl Component {
 40    /// Start a new XMPP component
 41    ///
 42    /// Start polling the returned instance so that it will connect
 43    /// and yield events.
 44    pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result<Self, JidParseError> {
 45        let jid = Jid::from_str(jid)?;
 46        let password = password.to_owned();
 47        let connect = Self::make_connect(jid.clone(), password, server, port, handle);
 48        Ok(Component {
 49            jid,
 50            state: ComponentState::Connecting(connect),
 51        })
 52    }
 53
 54    fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
 55        let jid1 = jid.clone();
 56        let password = password;
 57        Box::new(
 58            Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port)
 59                .expect("Connector::from_lookup")
 60                .and_then(move |tcp_stream| {
 61                    xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
 62                    .map_err(|e| format!("{}", e))
 63                }).and_then(move |xmpp_stream| {
 64                    Self::auth(xmpp_stream, password).expect("auth")
 65                }).and_then(|xmpp_stream| {
 66                    println!("Bound to {}", xmpp_stream.jid);
 67                    Ok(xmpp_stream)
 68                })
 69        )
 70    }
 71
 72    fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, password: String) -> Result<ComponentAuth<S>, String> {
 73        ComponentAuth::new(stream, password)
 74    }
 75}
 76
 77impl Stream for Component {
 78    type Item = Event;
 79    type Error = String;
 80
 81    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
 82        let state = replace(&mut self.state, ComponentState::Invalid);
 83
 84        match state {
 85            ComponentState::Invalid =>
 86                Err("invalid client state".to_owned()),
 87            ComponentState::Disconnected =>
 88                Ok(Async::Ready(None)),
 89            ComponentState::Connecting(mut connect) => {
 90                match connect.poll() {
 91                    Ok(Async::Ready(stream)) => {
 92                        self.state = ComponentState::Connected(stream);
 93                        Ok(Async::Ready(Some(Event::Online)))
 94                    },
 95                    Ok(Async::NotReady) => {
 96                        self.state = ComponentState::Connecting(connect);
 97                        Ok(Async::NotReady)
 98                    },
 99                    Err(e) =>
100                        Err(e),
101                }
102            },
103            ComponentState::Connected(mut stream) => {
104                // Poll sink
105                match stream.poll_complete() {
106                    Ok(Async::NotReady) => (),
107                    Ok(Async::Ready(())) => (),
108                    Err(e) =>
109                        return Err(e.description().to_owned()),
110                };
111
112                // Poll stream
113                match stream.poll() {
114                    Ok(Async::NotReady) => {
115                        self.state = ComponentState::Connected(stream);
116                        Ok(Async::NotReady)
117                    },
118                    Ok(Async::Ready(None)) => {
119                        // EOF
120                        self.state = ComponentState::Disconnected;
121                        Ok(Async::Ready(Some(Event::Disconnected)))
122                    },
123                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
124                        self.state = ComponentState::Connected(stream);
125                        Ok(Async::Ready(Some(Event::Stanza(stanza))))
126                    },
127                    Ok(Async::Ready(_)) => {
128                        self.state = ComponentState::Connected(stream);
129                        Ok(Async::NotReady)
130                    },
131                    Err(e) =>
132                        Err(e.description().to_owned()),
133                }
134            },
135        }
136    }
137}
138
139impl Sink for Component {
140    type SinkItem = Element;
141    type SinkError = String;
142
143    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
144        match self.state {
145            ComponentState::Connected(ref mut stream) =>
146                match stream.start_send(Packet::Stanza(item)) {
147                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
148                        Ok(AsyncSink::NotReady(stanza)),
149                    Ok(AsyncSink::NotReady(_)) =>
150                        panic!("Component.start_send with stanza but got something else back"),
151                    Ok(AsyncSink::Ready) => {
152                        Ok(AsyncSink::Ready)
153                    },
154                    Err(e) =>
155                        Err(e.description().to_owned()),
156                },
157            _ =>
158                Ok(AsyncSink::NotReady(item)),
159        }
160    }
161
162    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
163        match &mut self.state {
164            &mut ComponentState::Connected(ref mut stream) =>
165                stream.poll_complete()
166                .map_err(|e| e.description().to_owned()),
167            _ =>
168                Ok(Async::Ready(())),
169        }
170    }
171}