Merge branch 'component' into 'master'

lumi created

Add the possibility to write components

See merge request !3

Change summary

examples/component.rs |  27 +++++++
src/component.rs      | 168 +++++++++++++++++++++++++++++++++++++++++++++
src/connection.rs     |  23 ++++++
src/lib.rs            |   1 
src/ns.rs             |   1 
src/transport.rs      |  79 ++++++++++++++++++++
6 files changed, 298 insertions(+), 1 deletion(-)

Detailed changes

examples/component.rs πŸ”—

@@ -0,0 +1,27 @@
+extern crate xmpp;
+
+use xmpp::jid::Jid;
+use xmpp::component::ComponentBuilder;
+use xmpp::plugins::messaging::{MessagingPlugin, MessageEvent};
+
+use std::env;
+
+fn main() {
+    let jid: Jid = env::var("JID").unwrap().parse().unwrap();
+    let pass = env::var("PASS").unwrap();
+    let host = env::var("HOST").unwrap();
+    let port: u16 = env::var("PORT").unwrap().parse().unwrap();
+    let mut component = ComponentBuilder::new(jid.clone())
+                                         .password(pass)
+                                         .host(host)
+                                         .port(port)
+                                         .connect()
+                                         .unwrap();
+    component.register_plugin(MessagingPlugin::new());
+    loop {
+        let event = component.next_event().unwrap();
+        if let Some(evt) = event.downcast::<MessageEvent>() {
+            println!("{:?}", evt);
+        }
+    }
+}

src/component.rs πŸ”—

@@ -0,0 +1,168 @@
+use jid::Jid;
+use transport::{Transport, PlainTransport};
+use error::Error;
+use ns;
+use plugin::{Plugin, PluginProxyBinding};
+use event::AbstractEvent;
+use connection::{Connection, Component2S};
+use openssl::hash::{hash, MessageDigest};
+
+use minidom::Element;
+
+use xml::reader::XmlEvent as ReaderEvent;
+
+use std::sync::mpsc::{Receiver, channel};
+
+/// A builder for `Component`s.
+pub struct ComponentBuilder {
+    jid: Jid,
+    secret: String,
+    host: Option<String>,
+    port: u16,
+}
+
+impl ComponentBuilder {
+    /// Creates a new builder for an XMPP component that will connect to `jid` with default parameters.
+    pub fn new(jid: Jid) -> ComponentBuilder {
+        ComponentBuilder {
+            jid: jid,
+            secret: "".to_owned(),
+            host: None,
+            port: 5347,
+        }
+    }
+
+    /// Sets the host to connect to.
+    pub fn host(mut self, host: String) -> ComponentBuilder {
+        self.host = Some(host);
+        self
+    }
+
+    /// Sets the port to connect to.
+    pub fn port(mut self, port: u16) -> ComponentBuilder {
+        self.port = port;
+        self
+    }
+
+    /// Sets the password to use.
+    pub fn password<P: Into<String>>(mut self, password: P) -> ComponentBuilder {
+        self.secret = password.into();
+        self
+    }
+
+    /// Connects to the server and returns a `Component` when succesful.
+    pub fn connect(self) -> Result<Component, Error> {
+        let host = &self.host.unwrap_or(self.jid.domain.clone());
+        let mut transport = PlainTransport::connect(host, self.port)?;
+        Component2S::init(&mut transport, &self.jid.domain, "stream_opening")?;
+        let (sender_out, sender_in) = channel();
+        let (dispatcher_out, dispatcher_in) = channel();
+        let mut component = Component {
+            jid: self.jid,
+            transport: transport,
+            plugins: Vec::new(),
+            binding: PluginProxyBinding::new(sender_out, dispatcher_out),
+            sender_in: sender_in,
+            dispatcher_in: dispatcher_in,
+        };
+        component.connect(self.secret)?;
+        Ok(component)
+    }
+}
+
+/// An XMPP component.
+pub struct Component {
+    jid: Jid,
+    transport: PlainTransport,
+    plugins: Vec<Box<Plugin>>,
+    binding: PluginProxyBinding,
+    sender_in: Receiver<Element>,
+    dispatcher_in: Receiver<AbstractEvent>,
+}
+
+impl Component {
+    /// Returns a reference to the `Jid` associated with this `Component`.
+    pub fn jid(&self) -> &Jid {
+        &self.jid
+    }
+
+    /// Registers a plugin.
+    pub fn register_plugin<P: Plugin + 'static>(&mut self, mut plugin: P) {
+        plugin.bind(self.binding.clone());
+        self.plugins.push(Box::new(plugin));
+    }
+
+    /// Returns the plugin given by the type parameter, if it exists, else panics.
+    pub fn plugin<P: Plugin>(&self) -> &P {
+        for plugin in &self.plugins {
+            let any = plugin.as_any();
+            if let Some(ret) = any.downcast_ref::<P>() {
+                return ret;
+            }
+        }
+        panic!("plugin does not exist!");
+    }
+
+    /// Returns the next event and flush the send queue.
+    pub fn next_event(&mut self) -> Result<AbstractEvent, Error> {
+        self.flush_send_queue()?;
+        loop {
+            if let Ok(evt) = self.dispatcher_in.try_recv() {
+                return Ok(evt);
+            }
+            let elem = self.transport.read_element()?;
+            for plugin in self.plugins.iter_mut() {
+                plugin.handle(&elem);
+                // TODO: handle plugin return
+            }
+            self.flush_send_queue()?;
+        }
+    }
+
+    /// Flushes the send queue, sending all queued up stanzas.
+    pub fn flush_send_queue(&mut self) -> Result<(), Error> { // TODO: not sure how great of an
+                                                              //       idea it is to flush in this
+                                                              //       manner…
+        while let Ok(elem) = self.sender_in.try_recv() {
+            self.transport.write_element(&elem)?;
+        }
+        Ok(())
+    }
+
+    fn connect(&mut self, secret: String) -> Result<(), Error> {
+        // TODO: this is very ugly
+        let mut sid = String::new();
+        loop {
+            let e = self.transport.read_event()?;
+            match e {
+                ReaderEvent::StartElement { attributes, .. } => {
+                    for attribute in attributes {
+                        if attribute.name.namespace == None && attribute.name.local_name == "id" {
+                            sid = attribute.value;
+                        }
+                    }
+                    break;
+                },
+                _ => (),
+            }
+        }
+        let concatenated = format!("{}{}", sid, secret);
+        let hash = hash(MessageDigest::sha1(), concatenated.as_bytes())?;
+        let mut handshake = String::new();
+        for byte in hash {
+            // TODO: probably terrible perfs!
+            handshake = format!("{}{:x}", handshake, byte);
+        }
+        let mut elem = Element::builder("handshake")
+                               .ns(ns::COMPONENT_ACCEPT)
+                               .build();
+        elem.append_text_node(handshake);
+        self.transport.write_element(&elem)?;
+        loop {
+            let n = self.transport.read_element()?;
+            if n.is("handshake", ns::COMPONENT_ACCEPT) {
+                return Ok(());
+            }
+        }
+    }
+}

src/connection.rs πŸ”—

@@ -36,3 +36,26 @@ impl Connection for C2S {
         Ok(())
     }
 }
+
+pub struct Component2S;
+
+impl Connection for Component2S {
+    type InitError = Error;
+    type CloseError = Error;
+
+    fn namespace() -> &'static str { ns::COMPONENT_ACCEPT }
+
+    fn init<T: Transport>(transport: &mut T, domain: &str, id: &str) -> Result<(), Error> {
+        transport.write_event(WriterEvent::start_element("stream:stream")
+                                          .attr("to", domain)
+                                          .attr("id", id)
+                                          .default_ns(ns::COMPONENT_ACCEPT)
+                                          .ns("stream", ns::STREAM))?;
+        Ok(())
+    }
+
+    fn close<T: Transport>(transport: &mut T) -> Result<(), Error> {
+        transport.write_event(WriterEvent::end_element())?;
+        Ok(())
+    }
+}

src/lib.rs πŸ”—

@@ -9,6 +9,7 @@ pub mod ns;
 pub mod transport;
 pub mod error;
 pub mod client;
+pub mod component;
 pub mod plugin;
 pub mod event;
 pub mod plugins;

src/ns.rs πŸ”—

@@ -1,6 +1,7 @@
 //! Provides constants for namespaces.
 
 pub const CLIENT: &'static str = "jabber:client";
+pub const COMPONENT_ACCEPT: &'static str = "jabber:component:accept";
 pub const STREAM: &'static str = "http://etherx.jabber.org/streams";
 pub const TLS: &'static str = "urn:ietf:params:xml:ns:xmpp-tls";
 pub const SASL: &'static str = "urn:ietf:params:xml:ns:xmpp-sasl";

src/transport.rs πŸ”—

@@ -2,7 +2,7 @@
 
 use std::io::prelude::*;
 
-use std::net::TcpStream;
+use std::net::{TcpStream, Shutdown};
 
 use xml::reader::{EventReader, XmlEvent as XmlReaderEvent};
 use xml::writer::{EventWriter, XmlEvent as XmlWriterEvent, EmitterConfig};
@@ -45,6 +45,83 @@ pub trait Transport {
     }
 }
 
+/// A plain text transport, completely unencrypted.
+pub struct PlainTransport {
+    inner: Arc<Mutex<TcpStream>>, // TODO: this feels rather ugly
+    reader: EventReader<LockedIO<TcpStream>>, // TODO: especially feels ugly because
+                                              //       this read would keep the lock
+                                              //       held very long (potentially)
+    writer: EventWriter<LockedIO<TcpStream>>,
+}
+
+impl Transport for PlainTransport {
+    fn write_event<'a, E: Into<XmlWriterEvent<'a>>>(&mut self, event: E) -> Result<(), Error> {
+        Ok(self.writer.write(event)?)
+    }
+
+    fn read_event(&mut self) -> Result<XmlReaderEvent, Error> {
+        Ok(self.reader.next()?)
+    }
+
+    fn write_element(&mut self, element: &minidom::Element) -> Result<(), Error> {
+        println!("SENT: {:?}", element);
+        Ok(element.write_to(&mut self.writer)?)
+    }
+
+    fn read_element(&mut self) -> Result<minidom::Element, Error> {
+        let element = minidom::Element::from_reader(&mut self.reader)?;
+        println!("RECV: {:?}", element);
+        Ok(element)
+    }
+
+    fn reset_stream(&mut self) {
+        let locked_io = LockedIO::from(self.inner.clone());
+        self.reader = EventReader::new(locked_io.clone());
+        self.writer = EventWriter::new_with_config(locked_io, EmitterConfig {
+            line_separator: "".into(),
+            perform_indent: false,
+            normalize_empty_elements: false,
+            .. Default::default()
+        });
+    }
+
+    fn channel_bind(&self) -> ChannelBinding {
+        // TODO: channel binding
+        ChannelBinding::None
+    }
+}
+
+impl PlainTransport {
+    /// Connects to a server without any encryption.
+    pub fn connect(host: &str, port: u16) -> Result<PlainTransport, Error> {
+        let tcp_stream = TcpStream::connect((host, port))?;
+        let parser = EventReader::new(tcp_stream);
+        let parser_stream = parser.into_inner();
+        let stream = Arc::new(Mutex::new(parser_stream));
+        let locked_io = LockedIO::from(stream.clone());
+        let reader = EventReader::new(locked_io.clone());
+        let writer = EventWriter::new_with_config(locked_io, EmitterConfig {
+            line_separator: "".into(),
+            perform_indent: false,
+            normalize_empty_elements: false,
+            .. Default::default()
+        });
+        Ok(PlainTransport {
+            inner: stream,
+            reader: reader,
+            writer: writer,
+        })
+    }
+
+    /// Closes the stream.
+    pub fn close(&mut self) {
+        self.inner.lock()
+                  .unwrap()
+                  .shutdown(Shutdown::Both)
+                  .unwrap(); // TODO: safety, return value and such
+    }
+}
+
 /// A transport which uses STARTTLS.
 pub struct SslTransport {
     inner: Arc<Mutex<SslStream<TcpStream>>>, // TODO: this feels rather ugly