Merge branch 'new-event-system' into 'master'

lumi created

Adapt new event system for component

See merge request !8

Change summary

examples/component.rs |  9 ---
src/component.rs      | 97 ++++++++++++++++++++++++--------------------
src/lib.rs            |  2 
3 files changed, 54 insertions(+), 54 deletions(-)

Detailed changes

examples/component.rs πŸ”—

@@ -2,7 +2,6 @@ extern crate xmpp;
 
 use xmpp::jid::Jid;
 use xmpp::component::ComponentBuilder;
-use xmpp::plugins::messaging::{MessagingPlugin, MessageEvent};
 
 use std::env;
 
@@ -17,11 +16,5 @@ fn main() {
                                          .port(port)
                                          .connect()
                                          .unwrap();
-    component.register_plugin(MessagingPlugin::new());
-    loop {
-        let event = component.next_event().unwrap();
-        if let Some(evt) = event.downcast::<MessageEvent>() {
-            println!("{:?}", evt);
-        }
-    }
+    component.main().unwrap();
 }

src/component.rs πŸ”—

@@ -1,9 +1,10 @@
+use xml;
 use jid::Jid;
 use transport::{Transport, PlainTransport};
 use error::Error;
 use ns;
-use plugin::{Plugin, PluginProxyBinding};
-use event::AbstractEvent;
+use plugin::{Plugin, PluginInit, PluginProxyBinding};
+use event::{Dispatcher, ReceiveElement};
 use connection::{Connection, Component2S};
 use sha_1::{Sha1, Digest};
 
@@ -12,7 +13,11 @@ use minidom::Element;
 use xml::reader::XmlEvent as ReaderEvent;
 
 use std::fmt::Write;
-use std::sync::mpsc::{Receiver, channel};
+use std::sync::{Mutex, Arc};
+
+use std::collections::HashMap;
+
+use std::any::TypeId;
 
 /// A builder for `Component`s.
 pub struct ComponentBuilder {
@@ -56,15 +61,14 @@ impl ComponentBuilder {
         let host = &self.host.unwrap_or(self.jid.domain.clone());
         let mut transport = PlainTransport::connect(host, self.port)?;
         Component2S::init(&mut transport, &self.jid.domain, "stream_opening")?;
-        let (sender_out, sender_in) = channel();
-        let (dispatcher_out, dispatcher_in) = channel();
+        let dispatcher = Arc::new(Mutex::new(Dispatcher::new()));
+        let transport = Arc::new(Mutex::new(transport));
         let mut component = Component {
             jid: self.jid,
             transport: transport,
-            plugins: Vec::new(),
-            binding: PluginProxyBinding::new(sender_out, dispatcher_out),
-            sender_in: sender_in,
-            dispatcher_in: dispatcher_in,
+            plugins: HashMap::new(),
+            binding: PluginProxyBinding::new(dispatcher.clone()),
+            dispatcher: dispatcher,
         };
         component.connect(self.secret)?;
         Ok(component)
@@ -74,11 +78,10 @@ impl ComponentBuilder {
 /// An XMPP component.
 pub struct Component {
     jid: Jid,
-    transport: PlainTransport,
-    plugins: Vec<Box<Plugin>>,
+    transport: Arc<Mutex<PlainTransport>>,
+    plugins: HashMap<TypeId, Arc<Box<Plugin>>>,
     binding: PluginProxyBinding,
-    sender_in: Receiver<Element>,
-    dispatcher_in: Receiver<AbstractEvent>,
+    dispatcher: Arc<Mutex<Dispatcher>>,
 }
 
 impl Component {
@@ -88,53 +91,57 @@ impl Component {
     }
 
     /// Registers a plugin.
-    pub fn register_plugin<P: Plugin + 'static>(&mut self, mut plugin: P) {
-        plugin.bind(self.binding.clone());
-        self.plugins.push(Box::new(plugin));
+    pub fn register_plugin<P: Plugin + PluginInit + 'static>(&mut self, mut plugin: P) {
+        let binding = self.binding.clone();
+        plugin.bind(binding);
+        let p = Arc::new(Box::new(plugin) as Box<Plugin>);
+        {
+            let mut disp = self.dispatcher.lock().unwrap();
+            P::init(&mut disp, p.clone());
+        }
+        if self.plugins.insert(TypeId::of::<P>(), p).is_some() {
+            panic!("registering a plugin that's already registered");
+        }
     }
 
     /// Returns the plugin given by the type parameter, if it exists, else panics.
     pub fn plugin<P: Plugin>(&self) -> &P {
-        for plugin in &self.plugins {
-            let any = plugin.as_any();
-            if let Some(ret) = any.downcast_ref::<P>() {
-                return ret;
-            }
-        }
-        panic!("plugin does not exist!");
+        self.plugins.get(&TypeId::of::<P>())
+                    .expect("the requested plugin was not registered")
+                    .as_any()
+                    .downcast_ref::<P>()
+                    .expect("plugin downcast failure (should not happen!!)")
     }
 
     /// Returns the next event and flush the send queue.
-    pub fn next_event(&mut self) -> Result<AbstractEvent, Error> {
-        self.flush_send_queue()?;
+    pub fn main(&mut self) -> Result<(), Error> {
+        self.dispatcher.lock().unwrap().flush_all();
         loop {
-            if let Ok(evt) = self.dispatcher_in.try_recv() {
-                return Ok(evt);
+            let elem = self.read_element()?;
+            {
+                let mut disp = self.dispatcher.lock().unwrap();
+                disp.dispatch(ReceiveElement(elem));
+                disp.flush_all();
             }
-            let elem = self.transport.read_element()?;
-            for plugin in self.plugins.iter_mut() {
-                plugin.handle(&elem);
-                // TODO: handle plugin return
-            }
-            self.flush_send_queue()?;
         }
     }
 
-    /// Flushes the send queue, sending all queued up stanzas.
-    pub fn flush_send_queue(&mut self) -> Result<(), Error> { // TODO: not sure how great of an
-                                                              //       idea it is to flush in this
-                                                              //       manner…
-        while let Ok(elem) = self.sender_in.try_recv() {
-            self.transport.write_element(&elem)?;
-        }
-        Ok(())
+    fn read_element(&self) -> Result<Element, Error> {
+        self.transport.lock().unwrap().read_element()
+    }
+
+    fn write_element(&self, elem: &Element) -> Result<(), Error> {
+        self.transport.lock().unwrap().write_element(elem)
+    }
+
+    fn read_event(&self) -> Result<xml::reader::XmlEvent, Error> {
+        self.transport.lock().unwrap().read_event()
     }
 
     fn connect(&mut self, secret: String) -> Result<(), Error> {
-        // TODO: this is very ugly
         let mut sid = String::new();
         loop {
-            let e = self.transport.read_event()?;
+            let e = self.read_event()?;
             match e {
                 ReaderEvent::StartElement { attributes, .. } => {
                     for attribute in attributes {
@@ -158,9 +165,9 @@ impl Component {
                                .ns(ns::COMPONENT_ACCEPT)
                                .build();
         elem.append_text_node(handshake);
-        self.transport.write_element(&elem)?;
+        self.write_element(&elem)?;
         loop {
-            let n = self.transport.read_element()?;
+            let n = self.read_element()?;
             if n.is("handshake", ns::COMPONENT_ACCEPT) {
                 return Ok(());
             }

src/lib.rs πŸ”—

@@ -14,7 +14,7 @@ pub mod ns;
 pub mod transport;
 pub mod error;
 pub mod client;
-//pub mod component;
+pub mod component;
 pub mod plugin;
 #[macro_use] pub mod plugin_macro;
 pub mod event;