mod.rs

  1use std::mem::replace;
  2use std::str::FromStr;
  3use std::error::Error;
  4use tokio_core::reactor::Handle;
  5use tokio_core::net::TcpStream;
  6use tokio_io::{AsyncRead, AsyncWrite};
  7use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
  8use minidom::Element;
  9use jid::{Jid, JidParseError};
 10
 11use super::xmpp_codec::Packet;
 12use super::xmpp_stream;
 13use super::happy_eyeballs::Connecter;
 14
 15mod auth;
 16use self::auth::ComponentAuth;
 17mod event;
 18pub use self::event::Event as ComponentEvent;
 19
 20pub struct Component {
 21    pub jid: Jid,
 22    state: ComponentState,
 23}
 24
 25type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
 26const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
 27
 28enum ComponentState {
 29    Invalid,
 30    Disconnected,
 31    Connecting(Box<Future<Item=XMPPStream, Error=String>>),
 32    Connected(XMPPStream),
 33}
 34
 35impl Component {
 36    pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result<Self, JidParseError> {
 37        let jid = try!(Jid::from_str(jid));
 38        let password = password.to_owned();
 39        let connect = Self::make_connect(jid.clone(), password, server, port, handle);
 40        Ok(Component {
 41            jid,
 42            state: ComponentState::Connecting(connect),
 43        })
 44    }
 45
 46    fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
 47        let jid1 = jid.clone();
 48        let password = password;
 49        Box::new(
 50            Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port)
 51                .expect("Connector::from_lookup")
 52                .and_then(move |tcp_stream| {
 53                    xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
 54                    .map_err(|e| format!("{}", e))
 55                }).and_then(move |xmpp_stream| {
 56                    Self::auth(xmpp_stream, password).expect("auth")
 57                }).and_then(|xmpp_stream| {
 58                    println!("Bound to {}", xmpp_stream.jid);
 59                    Ok(xmpp_stream)
 60                })
 61        )
 62    }
 63
 64    fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, password: String) -> Result<ComponentAuth<S>, String> {
 65        ComponentAuth::new(stream, password)
 66    }
 67}
 68
 69impl Stream for Component {
 70    type Item = ComponentEvent;
 71    type Error = String;
 72
 73    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
 74        let state = replace(&mut self.state, ComponentState::Invalid);
 75
 76        match state {
 77            ComponentState::Invalid =>
 78                Err("invalid client state".to_owned()),
 79            ComponentState::Disconnected =>
 80                Ok(Async::Ready(None)),
 81            ComponentState::Connecting(mut connect) => {
 82                match connect.poll() {
 83                    Ok(Async::Ready(stream)) => {
 84                        self.state = ComponentState::Connected(stream);
 85                        Ok(Async::Ready(Some(ComponentEvent::Online)))
 86                    },
 87                    Ok(Async::NotReady) => {
 88                        self.state = ComponentState::Connecting(connect);
 89                        Ok(Async::NotReady)
 90                    },
 91                    Err(e) =>
 92                        Err(e),
 93                }
 94            },
 95            ComponentState::Connected(mut stream) => {
 96                match stream.poll() {
 97                    Ok(Async::NotReady) => {
 98                        self.state = ComponentState::Connected(stream);
 99                        Ok(Async::NotReady)
100                    },
101                    Ok(Async::Ready(None)) => {
102                        // EOF
103                        self.state = ComponentState::Disconnected;
104                        Ok(Async::Ready(Some(ComponentEvent::Disconnected)))
105                    },
106                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
107                        self.state = ComponentState::Connected(stream);
108                        Ok(Async::Ready(Some(ComponentEvent::Stanza(stanza))))
109                    },
110                    Ok(Async::Ready(_)) => {
111                        self.state = ComponentState::Connected(stream);
112                        Ok(Async::NotReady)
113                    },
114                    Err(e) =>
115                        Err(e.description().to_owned()),
116                }
117            },
118        }
119    }
120}
121
122impl Sink for Component {
123    type SinkItem = Element;
124    type SinkError = String;
125
126    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
127        match self.state {
128            ComponentState::Connected(ref mut stream) =>
129                match stream.start_send(Packet::Stanza(item)) {
130                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
131                        Ok(AsyncSink::NotReady(stanza)),
132                    Ok(AsyncSink::NotReady(_)) =>
133                        panic!("Component.start_send with stanza but got something else back"),
134                    Ok(AsyncSink::Ready) => {
135                        Ok(AsyncSink::Ready)
136                    },
137                    Err(e) =>
138                        Err(e.description().to_owned()),
139                },
140            _ =>
141                Ok(AsyncSink::NotReady(item)),
142        }
143    }
144
145    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
146        match &mut self.state {
147            &mut ComponentState::Connected(ref mut stream) =>
148                stream.poll_complete()
149                .map_err(|e| e.description().to_owned()),
150            _ =>
151                Ok(Async::Ready(())),
152        }
153    }
154}