mod.rs

  1//! Components in XMPP are services/gateways that are logged into an
  2//! XMPP server under a JID consisting of just a domain name. They are
  3//! allowed to use any user and resource identifiers in their stanzas.
  4use futures::{done, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
  5use xmpp_parsers::{Jid, JidParseError, Element};
  6use std::mem::replace;
  7use std::str::FromStr;
  8use tokio::net::TcpStream;
  9use tokio_io::{AsyncRead, AsyncWrite};
 10
 11use super::event::Event;
 12use super::happy_eyeballs::Connecter;
 13use super::xmpp_codec::Packet;
 14use super::xmpp_stream;
 15use super::Error;
 16
 17mod auth;
 18use self::auth::ComponentAuth;
 19
 20/// Component connection to an XMPP server
 21pub struct Component {
 22    /// The component's Jabber-Id
 23    pub jid: Jid,
 24    state: ComponentState,
 25}
 26
 27type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
 28const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
 29
 30enum ComponentState {
 31    Invalid,
 32    Disconnected,
 33    Connecting(Box<dyn Future<Item = XMPPStream, Error = Error>>),
 34    Connected(XMPPStream),
 35}
 36
 37impl Component {
 38    /// Start a new XMPP component
 39    ///
 40    /// Start polling the returned instance so that it will connect
 41    /// and yield events.
 42    pub fn new(jid: &str, password: &str, server: &str, port: u16) -> Result<Self, JidParseError> {
 43        let jid = Jid::from_str(jid)?;
 44        let password = password.to_owned();
 45        let connect = Self::make_connect(jid.clone(), password, server, port);
 46        Ok(Component {
 47            jid,
 48            state: ComponentState::Connecting(Box::new(connect)),
 49        })
 50    }
 51
 52    fn make_connect(
 53        jid: Jid,
 54        password: String,
 55        server: &str,
 56        port: u16,
 57    ) -> impl Future<Item = XMPPStream, Error = Error> {
 58        let jid1 = jid.clone();
 59        let password = password;
 60        done(Connecter::from_lookup(server, None, port))
 61            .flatten()
 62            .and_then(move |tcp_stream| {
 63                xmpp_stream::XMPPStream::start(
 64                    tcp_stream,
 65                    jid1,
 66                    NS_JABBER_COMPONENT_ACCEPT.to_owned(),
 67                )
 68            })
 69            .and_then(move |xmpp_stream| Self::auth(xmpp_stream, password).expect("auth"))
 70    }
 71
 72    fn auth<S: AsyncRead + AsyncWrite>(
 73        stream: xmpp_stream::XMPPStream<S>,
 74        password: String,
 75    ) -> Result<ComponentAuth<S>, Error> {
 76        ComponentAuth::new(stream, password)
 77    }
 78}
 79
 80impl Stream for Component {
 81    type Item = Event;
 82    type Error = Error;
 83
 84    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
 85        let state = replace(&mut self.state, ComponentState::Invalid);
 86
 87        match state {
 88            ComponentState::Invalid => Err(Error::InvalidState),
 89            ComponentState::Disconnected => Ok(Async::Ready(None)),
 90            ComponentState::Connecting(mut connect) => match connect.poll() {
 91                Ok(Async::Ready(stream)) => {
 92                    self.state = ComponentState::Connected(stream);
 93                    Ok(Async::Ready(Some(Event::Online(self.jid.clone()))))
 94                }
 95                Ok(Async::NotReady) => {
 96                    self.state = ComponentState::Connecting(connect);
 97                    Ok(Async::NotReady)
 98                }
 99                Err(e) => Err(e),
100            },
101            ComponentState::Connected(mut stream) => {
102                // Poll sink
103                match stream.poll_complete() {
104                    Ok(Async::NotReady) => (),
105                    Ok(Async::Ready(())) => (),
106                    Err(e) => return Err(e)?,
107                };
108
109                // Poll stream
110                match stream.poll() {
111                    Ok(Async::NotReady) => {
112                        self.state = ComponentState::Connected(stream);
113                        Ok(Async::NotReady)
114                    }
115                    Ok(Async::Ready(None)) => {
116                        // EOF
117                        self.state = ComponentState::Disconnected;
118                        Ok(Async::Ready(Some(Event::Disconnected)))
119                    }
120                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
121                        self.state = ComponentState::Connected(stream);
122                        Ok(Async::Ready(Some(Event::Stanza(stanza))))
123                    }
124                    Ok(Async::Ready(_)) => {
125                        self.state = ComponentState::Connected(stream);
126                        Ok(Async::NotReady)
127                    }
128                    Err(e) => Err(e)?,
129                }
130            }
131        }
132    }
133}
134
135impl Sink for Component {
136    type SinkItem = Element;
137    type SinkError = Error;
138
139    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
140        match self.state {
141            ComponentState::Connected(ref mut stream) => match stream
142                .start_send(Packet::Stanza(item))
143            {
144                Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => Ok(AsyncSink::NotReady(stanza)),
145                Ok(AsyncSink::NotReady(_)) => {
146                    panic!("Component.start_send with stanza but got something else back")
147                }
148                Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready),
149                Err(e) => Err(e)?,
150            },
151            _ => Ok(AsyncSink::NotReady(item)),
152        }
153    }
154
155    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
156        match &mut self.state {
157            &mut ComponentState::Connected(ref mut stream) => {
158                stream.poll_complete().map_err(|e| e.into())
159            }
160            _ => Ok(Async::Ready(())),
161        }
162    }
163}