lib: add a component connection method

Emmanuel Gil Peyrot created

Change summary

Cargo.toml                 |   2 
examples/echo_component.rs | 103 ++++++++++++++++++++++++++
src/component/auth.rs      | 101 ++++++++++++++++++++++++++
src/component/event.rs     |  38 +++++++++
src/component/mod.rs       | 154 ++++++++++++++++++++++++++++++++++++++++
src/lib.rs                 |   3 
src/stream_start.rs        |  18 +++
7 files changed, 416 insertions(+), 3 deletions(-)

Detailed changes

Cargo.toml 🔗

@@ -19,3 +19,5 @@ jid = "*"
 domain = "0.2.1"
 xmpp-parsers = "0.6.0"
 idna = "*"
+try_from = "0.2.2"
+sha-1 = "0.4.1"

examples/echo_component.rs 🔗

@@ -0,0 +1,103 @@
+extern crate futures;
+extern crate tokio_core;
+extern crate tokio_xmpp;
+extern crate jid;
+extern crate minidom;
+extern crate xmpp_parsers;
+extern crate try_from;
+
+use std::env::args;
+use std::process::exit;
+use std::str::FromStr;
+use try_from::TryFrom;
+use tokio_core::reactor::Core;
+use futures::{Future, Stream, Sink, future};
+use tokio_xmpp::Component;
+use minidom::Element;
+use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow};
+use xmpp_parsers::message::{Message, MessageType};
+use jid::Jid;
+
+fn main() {
+    let args: Vec<String> = args().collect();
+    if args.len() < 3 || args.len() > 5 {
+        println!("Usage: {} <jid> <password> [server] [port]", args[0]);
+        exit(1);
+    }
+    let jid = &args[1];
+    let password = &args[2];
+    let server = &args.get(3).unwrap().parse().unwrap_or("127.0.0.1".to_owned());
+    let port: u16 = args.get(4).unwrap().parse().unwrap_or(5347u16);
+
+    // tokio_core context
+    let mut core = Core::new().unwrap();
+    // Component instance
+    println!("{} {} {} {} {:?}", jid, password, server, port, core.handle());
+    let component = Component::new(jid, password, server, port, core.handle()).unwrap();
+
+    // Make the two interfaces for sending and receiving independent
+    // of each other so we can move one into a closure.
+    println!("Got it: {}", component.jid);
+    let (sink, stream) = component.split();
+    // Wrap sink in Option so that we can take() it for the send(self)
+    // to consume and return it back when ready.
+    let mut sink = Some(sink);
+    let mut send = move |stanza| {
+        sink = Some(
+            sink.take().
+                expect("sink")
+                .send(stanza)
+                .wait()
+                .expect("sink.send")
+        );
+    };
+    // Main loop, processes events
+    let done = stream.for_each(|event| {
+        if event.is_online() {
+            println!("Online!");
+
+            let presence = make_presence(Jid::from_str("test@component.linkmauve.fr/coucou").unwrap(), Jid::from_str("linkmauve@linkmauve.fr").unwrap());
+            send(presence);
+        } else if let Some(message) = event.into_stanza()
+            .and_then(|stanza| Message::try_from(stanza).ok())
+        {
+            // This is a message we'll echo
+            match (message.from, message.bodies.get("")) {
+                (Some(from), Some(body)) =>
+                    if message.type_ != MessageType::Error {
+                        let reply = make_reply(from, body);
+                        send(reply);
+                    },
+                _ => (),
+            }
+        }
+
+        Box::new(future::ok(()))
+    });
+
+    // Start polling `done`
+    match core.run(done) {
+        Ok(_) => (),
+        Err(e) => {
+            println!("Fatal: {}", e);
+            ()
+        }
+    }
+}
+
+// Construct a <presence/>
+fn make_presence(from: Jid, to: Jid) -> Element {
+    let mut presence = Presence::new(PresenceType::None);
+    presence.from = Some(from);
+    presence.to = Some(to);
+    presence.show = PresenceShow::Chat;
+    presence.statuses.insert(String::from("en"), String::from("Echoing messages."));
+    presence.into()
+}
+
+// Construct a chat <message/>
+fn make_reply(to: Jid, body: &str) -> Element {
+    let mut message = Message::new(Some(to));
+    message.bodies.insert(String::new(), body.to_owned());
+    message.into()
+}

src/component/auth.rs 🔗

@@ -0,0 +1,101 @@
+use std::mem::replace;
+use futures::{Future, Poll, Async, sink, Sink, Stream};
+use tokio_io::{AsyncRead, AsyncWrite};
+use minidom::Element;
+use sha_1::{Sha1, Digest};
+
+use xmpp_codec::Packet;
+use xmpp_stream::XMPPStream;
+
+const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
+
+pub struct ComponentAuth<S: AsyncWrite> {
+    state: ComponentAuthState<S>,
+}
+
+enum ComponentAuthState<S: AsyncWrite> {
+    WaitSend(sink::Send<XMPPStream<S>>),
+    WaitRecv(XMPPStream<S>),
+    Invalid,
+}
+
+impl<S: AsyncWrite> ComponentAuth<S> {
+    pub fn new(stream: XMPPStream<S>, password: String) -> Result<Self, String> {
+        // FIXME: huge hack, shouldn’t be an element!
+        let sid = stream.stream_features.name().to_owned();
+        let mut this = ComponentAuth {
+            state: ComponentAuthState::Invalid,
+        };
+        this.send(
+            stream,
+            "handshake",
+            // TODO: sha1(sid + password)
+            &format!("{:x}", Sha1::digest((sid + &password).as_bytes()))
+        );
+        return Ok(this);
+    }
+
+    fn send(&mut self, stream: XMPPStream<S>, nonza_name: &str, handshake: &str) {
+        let nonza = Element::builder(nonza_name)
+            .ns(NS_JABBER_COMPONENT_ACCEPT)
+            .append(handshake)
+            .build();
+
+        let send = stream.send(Packet::Stanza(nonza));
+
+        self.state = ComponentAuthState::WaitSend(send);
+    }
+}
+
+impl<S: AsyncRead + AsyncWrite> Future for ComponentAuth<S> {
+    type Item = XMPPStream<S>;
+    type Error = String;
+
+    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        let state = replace(&mut self.state, ComponentAuthState::Invalid);
+
+        match state {
+            ComponentAuthState::WaitSend(mut send) =>
+                match send.poll() {
+                    Ok(Async::Ready(stream)) => {
+                        self.state = ComponentAuthState::WaitRecv(stream);
+                        self.poll()
+                    },
+                    Ok(Async::NotReady) => {
+                        self.state = ComponentAuthState::WaitSend(send);
+                        Ok(Async::NotReady)
+                    },
+                    Err(e) =>
+                        Err(format!("{}", e)),
+                },
+            ComponentAuthState::WaitRecv(mut stream) =>
+                match stream.poll() {
+                    Ok(Async::Ready(Some(Packet::Stanza(ref stanza))))
+                        if stanza.name() == "handshake"
+                        && stanza.ns() == Some(NS_JABBER_COMPONENT_ACCEPT) =>
+                    {
+                        self.state = ComponentAuthState::Invalid;
+                        Ok(Async::Ready(stream))
+                    },
+                    Ok(Async::Ready(Some(Packet::Stanza(ref stanza))))
+                        if stanza.is("error", "http://etherx.jabber.org/streams") =>
+                    {
+                        let e = "Authentication failure";
+                        Err(e.to_owned())
+                    },
+                    Ok(Async::Ready(event)) => {
+                        println!("ComponentAuth ignore {:?}", event);
+                        Ok(Async::NotReady)
+                    },
+                    Ok(_) => {
+                        self.state = ComponentAuthState::WaitRecv(stream);
+                        Ok(Async::NotReady)
+                    },
+                    Err(e) =>
+                        Err(format!("{}", e)),
+                },
+            ComponentAuthState::Invalid =>
+                unreachable!(),
+        }
+    }
+}

src/component/event.rs 🔗

@@ -0,0 +1,38 @@
+use minidom::Element;
+
+#[derive(Debug)]
+pub enum Event {
+    Online,
+    Disconnected,
+    Stanza(Element),
+}
+
+impl Event {
+    pub fn is_online(&self) -> bool {
+        match *self {
+            Event::Online => true,
+            _ => false,
+        }
+    }
+
+    pub fn is_stanza(&self, name: &str) -> bool {
+        match *self {
+            Event::Stanza(ref stanza) => stanza.name() == name,
+            _ => false,
+        }
+    }
+
+    pub fn as_stanza(&self) -> Option<&Element> {
+        match *self {
+            Event::Stanza(ref stanza) => Some(stanza),
+            _ => None,
+        }
+    }
+
+    pub fn into_stanza(self) -> Option<Element> {
+        match self {
+            Event::Stanza(stanza) => Some(stanza),
+            _ => None,
+        }
+    }
+}

src/component/mod.rs 🔗

@@ -0,0 +1,154 @@
+use std::mem::replace;
+use std::str::FromStr;
+use std::error::Error;
+use tokio_core::reactor::Handle;
+use tokio_core::net::TcpStream;
+use tokio_io::{AsyncRead, AsyncWrite};
+use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
+use minidom::Element;
+use jid::{Jid, JidParseError};
+
+use super::xmpp_codec::Packet;
+use super::xmpp_stream;
+use super::happy_eyeballs::Connecter;
+
+mod auth;
+use self::auth::ComponentAuth;
+mod event;
+pub use self::event::Event as ComponentEvent;
+
+pub struct Component {
+    pub jid: Jid,
+    state: ComponentState,
+}
+
+type XMPPStream = xmpp_stream::XMPPStream<TcpStream>;
+const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept";
+
+enum ComponentState {
+    Invalid,
+    Disconnected,
+    Connecting(Box<Future<Item=XMPPStream, Error=String>>),
+    Connected(XMPPStream),
+}
+
+impl Component {
+    pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result<Self, JidParseError> {
+        let jid = try!(Jid::from_str(jid));
+        let password = password.to_owned();
+        let connect = Self::make_connect(jid.clone(), password, server, port, handle);
+        Ok(Component {
+            jid,
+            state: ComponentState::Connecting(connect),
+        })
+    }
+
+    fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> Box<Future<Item=XMPPStream, Error=String>> {
+        let jid1 = jid.clone();
+        let password = password;
+        Box::new(
+            Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port)
+                .expect("Connector::from_lookup")
+                .and_then(move |tcp_stream| {
+                    xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned())
+                    .map_err(|e| format!("{}", e))
+                }).and_then(move |xmpp_stream| {
+                    Self::auth(xmpp_stream, password).expect("auth")
+                }).and_then(|xmpp_stream| {
+                    println!("Bound to {}", xmpp_stream.jid);
+                    Ok(xmpp_stream)
+                })
+        )
+    }
+
+    fn auth<S: AsyncRead + AsyncWrite>(stream: xmpp_stream::XMPPStream<S>, password: String) -> Result<ComponentAuth<S>, String> {
+        ComponentAuth::new(stream, password)
+    }
+}
+
+impl Stream for Component {
+    type Item = ComponentEvent;
+    type Error = String;
+
+    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+        let state = replace(&mut self.state, ComponentState::Invalid);
+
+        match state {
+            ComponentState::Invalid =>
+                Err("invalid client state".to_owned()),
+            ComponentState::Disconnected =>
+                Ok(Async::Ready(None)),
+            ComponentState::Connecting(mut connect) => {
+                match connect.poll() {
+                    Ok(Async::Ready(stream)) => {
+                        self.state = ComponentState::Connected(stream);
+                        Ok(Async::Ready(Some(ComponentEvent::Online)))
+                    },
+                    Ok(Async::NotReady) => {
+                        self.state = ComponentState::Connecting(connect);
+                        Ok(Async::NotReady)
+                    },
+                    Err(e) =>
+                        Err(e),
+                }
+            },
+            ComponentState::Connected(mut stream) => {
+                match stream.poll() {
+                    Ok(Async::NotReady) => {
+                        self.state = ComponentState::Connected(stream);
+                        Ok(Async::NotReady)
+                    },
+                    Ok(Async::Ready(None)) => {
+                        // EOF
+                        self.state = ComponentState::Disconnected;
+                        Ok(Async::Ready(Some(ComponentEvent::Disconnected)))
+                    },
+                    Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => {
+                        self.state = ComponentState::Connected(stream);
+                        Ok(Async::Ready(Some(ComponentEvent::Stanza(stanza))))
+                    },
+                    Ok(Async::Ready(_)) => {
+                        self.state = ComponentState::Connected(stream);
+                        Ok(Async::NotReady)
+                    },
+                    Err(e) =>
+                        Err(e.description().to_owned()),
+                }
+            },
+        }
+    }
+}
+
+impl Sink for Component {
+    type SinkItem = Element;
+    type SinkError = String;
+
+    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+        match self.state {
+            ComponentState::Connected(ref mut stream) =>
+                match stream.start_send(Packet::Stanza(item)) {
+                    Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) =>
+                        Ok(AsyncSink::NotReady(stanza)),
+                    Ok(AsyncSink::NotReady(_)) =>
+                        panic!("Component.start_send with stanza but got something else back"),
+                    Ok(AsyncSink::Ready) => {
+                        Ok(AsyncSink::Ready)
+                    },
+                    Err(e) =>
+                        Err(e.description().to_owned()),
+                },
+            _ =>
+                Ok(AsyncSink::NotReady(item)),
+        }
+    }
+
+    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+        match &mut self.state {
+            &mut ComponentState::Connected(ref mut stream) =>
+                stream.poll_complete()
+                .map_err(|e| e.description().to_owned()),
+            _ =>
+                Ok(Async::Ready(())),
+        }
+    }
+}

src/lib.rs 🔗

@@ -12,6 +12,7 @@ extern crate rustc_serialize as serialize;
 extern crate jid;
 extern crate domain;
 extern crate idna;
+extern crate sha_1;
 
 pub mod xmpp_codec;
 pub mod xmpp_stream;
@@ -21,3 +22,5 @@ pub use starttls::StartTlsClient;
 mod happy_eyeballs;
 mod client;
 pub use client::{Client, ClientEvent};
+mod component;
+pub use component::{Component, ComponentEvent};

src/stream_start.rs 🔗

@@ -4,6 +4,7 @@ use futures::{Future, Async, Poll, Stream, sink, Sink};
 use tokio_io::{AsyncRead, AsyncWrite};
 use tokio_io::codec::Framed;
 use jid::Jid;
+use minidom::Element;
 
 use xmpp_codec::{XMPPCodec, Packet};
 use xmpp_stream::XMPPStream;
@@ -63,14 +64,25 @@ impl<S: AsyncRead + AsyncWrite> Future for StreamStart<S> {
             StreamStartState::RecvStart(mut stream) =>
                 match stream.poll() {
                     Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => {
-                        retry = true;
                         let stream_ns = match stream_attrs.get("xmlns") {
                             Some(ns) => ns.clone(),
                             None =>
                                 return Err(Error::from(ErrorKind::InvalidData)),
                         };
-                        // TODO: skip RecvFeatures for version < 1.0
-                        (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
+                        if self.ns == "jabber:client" {
+                            retry = true;
+                            // TODO: skip RecvFeatures for version < 1.0
+                            (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady))
+                        } else {
+                            let id = match stream_attrs.get("id") {
+                                Some(id) => id.clone(),
+                                None =>
+                                    return Err(Error::from(ErrorKind::InvalidData)),
+                            };
+                                                                                                    // FIXME: huge hack, shouldn’t be an element!
+                            let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), Element::builder(id).build());
+                            (StreamStartState::Invalid, Ok(Async::Ready(stream)))
+                        }
                     },
                     Ok(Async::Ready(_)) =>
                         return Err(Error::from(ErrorKind::InvalidData)),