From c0f3fc4afbb78092cf48d1c62d3dd67de5ddc145 Mon Sep 17 00:00:00 2001 From: Emmanuel Gil Peyrot Date: Sat, 22 Jul 2017 01:59:51 +0100 Subject: [PATCH] lib: add a component connection method --- Cargo.toml | 2 + examples/echo_component.rs | 103 +++++++++++++++++++++++++ src/component/auth.rs | 101 ++++++++++++++++++++++++ src/component/event.rs | 38 +++++++++ src/component/mod.rs | 154 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 + src/stream_start.rs | 18 ++++- 7 files changed, 416 insertions(+), 3 deletions(-) create mode 100644 examples/echo_component.rs create mode 100644 src/component/auth.rs create mode 100644 src/component/event.rs create mode 100644 src/component/mod.rs diff --git a/Cargo.toml b/Cargo.toml index aacdbc4c0f9bcbd02d51ac8a6e8fb5215fd930f6..560305c8038cdd26bf5bc8057a3d1b0b265013fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,5 @@ jid = "*" domain = "0.2.1" xmpp-parsers = "0.6.0" idna = "*" +try_from = "0.2.2" +sha-1 = "0.4.1" diff --git a/examples/echo_component.rs b/examples/echo_component.rs new file mode 100644 index 0000000000000000000000000000000000000000..7ea1b7dd7577fa8b21a5f85d833ca45471b57afd --- /dev/null +++ b/examples/echo_component.rs @@ -0,0 +1,103 @@ +extern crate futures; +extern crate tokio_core; +extern crate tokio_xmpp; +extern crate jid; +extern crate minidom; +extern crate xmpp_parsers; +extern crate try_from; + +use std::env::args; +use std::process::exit; +use std::str::FromStr; +use try_from::TryFrom; +use tokio_core::reactor::Core; +use futures::{Future, Stream, Sink, future}; +use tokio_xmpp::Component; +use minidom::Element; +use xmpp_parsers::presence::{Presence, Type as PresenceType, Show as PresenceShow}; +use xmpp_parsers::message::{Message, MessageType}; +use jid::Jid; + +fn main() { + let args: Vec = args().collect(); + if args.len() < 3 || args.len() > 5 { + println!("Usage: {} [server] [port]", args[0]); + exit(1); + } + let jid = &args[1]; + let password = &args[2]; + let server = &args.get(3).unwrap().parse().unwrap_or("127.0.0.1".to_owned()); + let port: u16 = args.get(4).unwrap().parse().unwrap_or(5347u16); + + // tokio_core context + let mut core = Core::new().unwrap(); + // Component instance + println!("{} {} {} {} {:?}", jid, password, server, port, core.handle()); + let component = Component::new(jid, password, server, port, core.handle()).unwrap(); + + // Make the two interfaces for sending and receiving independent + // of each other so we can move one into a closure. + println!("Got it: {}", component.jid); + let (sink, stream) = component.split(); + // Wrap sink in Option so that we can take() it for the send(self) + // to consume and return it back when ready. + let mut sink = Some(sink); + let mut send = move |stanza| { + sink = Some( + sink.take(). + expect("sink") + .send(stanza) + .wait() + .expect("sink.send") + ); + }; + // Main loop, processes events + let done = stream.for_each(|event| { + if event.is_online() { + println!("Online!"); + + let presence = make_presence(Jid::from_str("test@component.linkmauve.fr/coucou").unwrap(), Jid::from_str("linkmauve@linkmauve.fr").unwrap()); + send(presence); + } else if let Some(message) = event.into_stanza() + .and_then(|stanza| Message::try_from(stanza).ok()) + { + // This is a message we'll echo + match (message.from, message.bodies.get("")) { + (Some(from), Some(body)) => + if message.type_ != MessageType::Error { + let reply = make_reply(from, body); + send(reply); + }, + _ => (), + } + } + + Box::new(future::ok(())) + }); + + // Start polling `done` + match core.run(done) { + Ok(_) => (), + Err(e) => { + println!("Fatal: {}", e); + () + } + } +} + +// Construct a +fn make_presence(from: Jid, to: Jid) -> Element { + let mut presence = Presence::new(PresenceType::None); + presence.from = Some(from); + presence.to = Some(to); + presence.show = PresenceShow::Chat; + presence.statuses.insert(String::from("en"), String::from("Echoing messages.")); + presence.into() +} + +// Construct a chat +fn make_reply(to: Jid, body: &str) -> Element { + let mut message = Message::new(Some(to)); + message.bodies.insert(String::new(), body.to_owned()); + message.into() +} diff --git a/src/component/auth.rs b/src/component/auth.rs new file mode 100644 index 0000000000000000000000000000000000000000..773e453d68c2b26155698242d95c521068169d8a --- /dev/null +++ b/src/component/auth.rs @@ -0,0 +1,101 @@ +use std::mem::replace; +use futures::{Future, Poll, Async, sink, Sink, Stream}; +use tokio_io::{AsyncRead, AsyncWrite}; +use minidom::Element; +use sha_1::{Sha1, Digest}; + +use xmpp_codec::Packet; +use xmpp_stream::XMPPStream; + +const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept"; + +pub struct ComponentAuth { + state: ComponentAuthState, +} + +enum ComponentAuthState { + WaitSend(sink::Send>), + WaitRecv(XMPPStream), + Invalid, +} + +impl ComponentAuth { + pub fn new(stream: XMPPStream, password: String) -> Result { + // FIXME: huge hack, shouldn’t be an element! + let sid = stream.stream_features.name().to_owned(); + let mut this = ComponentAuth { + state: ComponentAuthState::Invalid, + }; + this.send( + stream, + "handshake", + // TODO: sha1(sid + password) + &format!("{:x}", Sha1::digest((sid + &password).as_bytes())) + ); + return Ok(this); + } + + fn send(&mut self, stream: XMPPStream, nonza_name: &str, handshake: &str) { + let nonza = Element::builder(nonza_name) + .ns(NS_JABBER_COMPONENT_ACCEPT) + .append(handshake) + .build(); + + let send = stream.send(Packet::Stanza(nonza)); + + self.state = ComponentAuthState::WaitSend(send); + } +} + +impl Future for ComponentAuth { + type Item = XMPPStream; + type Error = String; + + fn poll(&mut self) -> Poll { + let state = replace(&mut self.state, ComponentAuthState::Invalid); + + match state { + ComponentAuthState::WaitSend(mut send) => + match send.poll() { + Ok(Async::Ready(stream)) => { + self.state = ComponentAuthState::WaitRecv(stream); + self.poll() + }, + Ok(Async::NotReady) => { + self.state = ComponentAuthState::WaitSend(send); + Ok(Async::NotReady) + }, + Err(e) => + Err(format!("{}", e)), + }, + ComponentAuthState::WaitRecv(mut stream) => + match stream.poll() { + Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) + if stanza.name() == "handshake" + && stanza.ns() == Some(NS_JABBER_COMPONENT_ACCEPT) => + { + self.state = ComponentAuthState::Invalid; + Ok(Async::Ready(stream)) + }, + Ok(Async::Ready(Some(Packet::Stanza(ref stanza)))) + if stanza.is("error", "http://etherx.jabber.org/streams") => + { + let e = "Authentication failure"; + Err(e.to_owned()) + }, + Ok(Async::Ready(event)) => { + println!("ComponentAuth ignore {:?}", event); + Ok(Async::NotReady) + }, + Ok(_) => { + self.state = ComponentAuthState::WaitRecv(stream); + Ok(Async::NotReady) + }, + Err(e) => + Err(format!("{}", e)), + }, + ComponentAuthState::Invalid => + unreachable!(), + } + } +} diff --git a/src/component/event.rs b/src/component/event.rs new file mode 100644 index 0000000000000000000000000000000000000000..8ff44bd70a84ded7200eace6cea4055015a7e29a --- /dev/null +++ b/src/component/event.rs @@ -0,0 +1,38 @@ +use minidom::Element; + +#[derive(Debug)] +pub enum Event { + Online, + Disconnected, + Stanza(Element), +} + +impl Event { + pub fn is_online(&self) -> bool { + match *self { + Event::Online => true, + _ => false, + } + } + + pub fn is_stanza(&self, name: &str) -> bool { + match *self { + Event::Stanza(ref stanza) => stanza.name() == name, + _ => false, + } + } + + pub fn as_stanza(&self) -> Option<&Element> { + match *self { + Event::Stanza(ref stanza) => Some(stanza), + _ => None, + } + } + + pub fn into_stanza(self) -> Option { + match self { + Event::Stanza(stanza) => Some(stanza), + _ => None, + } + } +} diff --git a/src/component/mod.rs b/src/component/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..95790f000546e2e2a851583332411a90af179257 --- /dev/null +++ b/src/component/mod.rs @@ -0,0 +1,154 @@ +use std::mem::replace; +use std::str::FromStr; +use std::error::Error; +use tokio_core::reactor::Handle; +use tokio_core::net::TcpStream; +use tokio_io::{AsyncRead, AsyncWrite}; +use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink}; +use minidom::Element; +use jid::{Jid, JidParseError}; + +use super::xmpp_codec::Packet; +use super::xmpp_stream; +use super::happy_eyeballs::Connecter; + +mod auth; +use self::auth::ComponentAuth; +mod event; +pub use self::event::Event as ComponentEvent; + +pub struct Component { + pub jid: Jid, + state: ComponentState, +} + +type XMPPStream = xmpp_stream::XMPPStream; +const NS_JABBER_COMPONENT_ACCEPT: &str = "jabber:component:accept"; + +enum ComponentState { + Invalid, + Disconnected, + Connecting(Box>), + Connected(XMPPStream), +} + +impl Component { + pub fn new(jid: &str, password: &str, server: &str, port: u16, handle: Handle) -> Result { + let jid = try!(Jid::from_str(jid)); + let password = password.to_owned(); + let connect = Self::make_connect(jid.clone(), password, server, port, handle); + Ok(Component { + jid, + state: ComponentState::Connecting(connect), + }) + } + + fn make_connect(jid: Jid, password: String, server: &str, port: u16, handle: Handle) -> Box> { + let jid1 = jid.clone(); + let password = password; + Box::new( + Connecter::from_lookup(handle, server, "_xmpp-component._tcp", port) + .expect("Connector::from_lookup") + .and_then(move |tcp_stream| { + xmpp_stream::XMPPStream::start(tcp_stream, jid1, NS_JABBER_COMPONENT_ACCEPT.to_owned()) + .map_err(|e| format!("{}", e)) + }).and_then(move |xmpp_stream| { + Self::auth(xmpp_stream, password).expect("auth") + }).and_then(|xmpp_stream| { + println!("Bound to {}", xmpp_stream.jid); + Ok(xmpp_stream) + }) + ) + } + + fn auth(stream: xmpp_stream::XMPPStream, password: String) -> Result, String> { + ComponentAuth::new(stream, password) + } +} + +impl Stream for Component { + type Item = ComponentEvent; + type Error = String; + + fn poll(&mut self) -> Poll, Self::Error> { + let state = replace(&mut self.state, ComponentState::Invalid); + + match state { + ComponentState::Invalid => + Err("invalid client state".to_owned()), + ComponentState::Disconnected => + Ok(Async::Ready(None)), + ComponentState::Connecting(mut connect) => { + match connect.poll() { + Ok(Async::Ready(stream)) => { + self.state = ComponentState::Connected(stream); + Ok(Async::Ready(Some(ComponentEvent::Online))) + }, + Ok(Async::NotReady) => { + self.state = ComponentState::Connecting(connect); + Ok(Async::NotReady) + }, + Err(e) => + Err(e), + } + }, + ComponentState::Connected(mut stream) => { + match stream.poll() { + Ok(Async::NotReady) => { + self.state = ComponentState::Connected(stream); + Ok(Async::NotReady) + }, + Ok(Async::Ready(None)) => { + // EOF + self.state = ComponentState::Disconnected; + Ok(Async::Ready(Some(ComponentEvent::Disconnected))) + }, + Ok(Async::Ready(Some(Packet::Stanza(stanza)))) => { + self.state = ComponentState::Connected(stream); + Ok(Async::Ready(Some(ComponentEvent::Stanza(stanza)))) + }, + Ok(Async::Ready(_)) => { + self.state = ComponentState::Connected(stream); + Ok(Async::NotReady) + }, + Err(e) => + Err(e.description().to_owned()), + } + }, + } + } +} + +impl Sink for Component { + type SinkItem = Element; + type SinkError = String; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + match self.state { + ComponentState::Connected(ref mut stream) => + match stream.start_send(Packet::Stanza(item)) { + Ok(AsyncSink::NotReady(Packet::Stanza(stanza))) => + Ok(AsyncSink::NotReady(stanza)), + Ok(AsyncSink::NotReady(_)) => + panic!("Component.start_send with stanza but got something else back"), + Ok(AsyncSink::Ready) => { + Ok(AsyncSink::Ready) + }, + Err(e) => + Err(e.description().to_owned()), + }, + _ => + Ok(AsyncSink::NotReady(item)), + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + match &mut self.state { + &mut ComponentState::Connected(ref mut stream) => + stream.poll_complete() + .map_err(|e| e.description().to_owned()), + _ => + Ok(Async::Ready(())), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index ce8ccb7d7ceca60bb53f4514a48b024eeda18cdf..81f3b1c6ac4d9ff63c6cc4796f50fad95d0a5e2d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ extern crate rustc_serialize as serialize; extern crate jid; extern crate domain; extern crate idna; +extern crate sha_1; pub mod xmpp_codec; pub mod xmpp_stream; @@ -21,3 +22,5 @@ pub use starttls::StartTlsClient; mod happy_eyeballs; mod client; pub use client::{Client, ClientEvent}; +mod component; +pub use component::{Component, ComponentEvent}; diff --git a/src/stream_start.rs b/src/stream_start.rs index b97e87d99d80ecbc734a1c848ab033180d2180a8..1d1813a383209784d6e367b0f17901527db3434a 100644 --- a/src/stream_start.rs +++ b/src/stream_start.rs @@ -4,6 +4,7 @@ use futures::{Future, Async, Poll, Stream, sink, Sink}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::Framed; use jid::Jid; +use minidom::Element; use xmpp_codec::{XMPPCodec, Packet}; use xmpp_stream::XMPPStream; @@ -63,14 +64,25 @@ impl Future for StreamStart { StreamStartState::RecvStart(mut stream) => match stream.poll() { Ok(Async::Ready(Some(Packet::StreamStart(stream_attrs)))) => { - retry = true; let stream_ns = match stream_attrs.get("xmlns") { Some(ns) => ns.clone(), None => return Err(Error::from(ErrorKind::InvalidData)), }; - // TODO: skip RecvFeatures for version < 1.0 - (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)) + if self.ns == "jabber:client" { + retry = true; + // TODO: skip RecvFeatures for version < 1.0 + (StreamStartState::RecvFeatures(stream, stream_ns), Ok(Async::NotReady)) + } else { + let id = match stream_attrs.get("id") { + Some(id) => id.clone(), + None => + return Err(Error::from(ErrorKind::InvalidData)), + }; + // FIXME: huge hack, shouldn’t be an element! + let stream = XMPPStream::new(self.jid.clone(), stream, self.ns.clone(), Element::builder(id).build()); + (StreamStartState::Invalid, Ok(Async::Ready(stream))) + } }, Ok(Async::Ready(_)) => return Err(Error::from(ErrorKind::InvalidData)),