diff --git a/src/client.rs b/src/client.rs index 918ca88baba6cdb7276df81acc4a4c6a69b16770..88537e65b20a1357bd82a291011ce7f84eb51527 100644 --- a/src/client.rs +++ b/src/client.rs @@ -77,7 +77,7 @@ impl ClientBuilder { let host = &self.host.unwrap_or(self.jid.domain.clone()); let mut transport = SslTransport::connect(host, self.port)?; C2S::init(&mut transport, &self.jid.domain, "before_sasl")?; - let dispatcher = Arc::new(Mutex::new(Dispatcher::new())); + let dispatcher = Arc::new(Dispatcher::new()); let mut credentials = self.credentials; credentials.channel_binding = transport.channel_bind(); let transport = Arc::new(Mutex::new(transport)); @@ -88,7 +88,7 @@ impl ClientBuilder { binding: PluginProxyBinding::new(dispatcher.clone()), dispatcher: dispatcher, }; - client.dispatcher.lock().unwrap().register(Priority::Default, move |evt: &SendElement| { + client.dispatcher.register(Priority::Default, move |evt: &SendElement| { let mut t = transport.lock().unwrap(); t.write_element(&evt.0).unwrap(); Propagation::Continue @@ -105,7 +105,7 @@ pub struct Client { transport: Arc>, plugins: HashMap>>, binding: PluginProxyBinding, - dispatcher: Arc>, + dispatcher: Arc, } impl Client { @@ -119,10 +119,7 @@ impl Client { let binding = self.binding.clone(); plugin.bind(binding); let p = Arc::new(Box::new(plugin) as Box); - { - let mut disp = self.dispatcher.lock().unwrap(); - P::init(&mut disp, p.clone()); - } + P::init(&self.dispatcher, p.clone()); if self.plugins.insert(TypeId::of::

(), p).is_some() { panic!("registering a plugin that's already registered"); } @@ -132,7 +129,7 @@ impl Client { where E: Event, F: Fn(&E) -> Propagation + 'static { - self.dispatcher.lock().unwrap().register(pri, func); + self.dispatcher.register(pri, func); } /// Returns the plugin given by the type parameter, if it exists, else panics. @@ -146,14 +143,11 @@ impl Client { /// Returns the next event and flush the send queue. pub fn main(&mut self) -> Result<(), Error> { - self.dispatcher.lock().unwrap().flush_all(); + self.dispatcher.flush_all(); loop { let elem = self.read_element()?; - { - let mut disp = self.dispatcher.lock().unwrap(); - disp.dispatch(ReceiveElement(elem)); - disp.flush_all(); - } + self.dispatcher.dispatch(ReceiveElement(elem)); + self.dispatcher.flush_all(); } } diff --git a/src/component.rs b/src/component.rs index b04916ecf17f7790ed5bf1fa54980750b208cc65..3f7c61ef90b1b01e9f3bed26d7d5f597d636d5d1 100644 --- a/src/component.rs +++ b/src/component.rs @@ -4,7 +4,7 @@ use transport::{Transport, PlainTransport}; use error::Error; use ns; use plugin::{Plugin, PluginInit, PluginProxyBinding}; -use event::{Dispatcher, ReceiveElement}; +use event::{Dispatcher, ReceiveElement, SendElement, Propagation, Priority, Event}; use connection::{Connection, Component2S}; use sha_1::{Sha1, Digest}; @@ -61,15 +61,20 @@ impl ComponentBuilder { let host = &self.host.unwrap_or(self.jid.domain.clone()); let mut transport = PlainTransport::connect(host, self.port)?; Component2S::init(&mut transport, &self.jid.domain, "stream_opening")?; - let dispatcher = Arc::new(Mutex::new(Dispatcher::new())); + let dispatcher = Arc::new(Dispatcher::new()); let transport = Arc::new(Mutex::new(transport)); let mut component = Component { jid: self.jid, - transport: transport, + transport: transport.clone(), plugins: HashMap::new(), binding: PluginProxyBinding::new(dispatcher.clone()), dispatcher: dispatcher, }; + component.dispatcher.register(Priority::Default, move |evt: &SendElement| { + let mut t = transport.lock().unwrap(); + t.write_element(&evt.0).unwrap(); + Propagation::Continue + }); component.connect(self.secret)?; Ok(component) } @@ -81,7 +86,7 @@ pub struct Component { transport: Arc>, plugins: HashMap>>, binding: PluginProxyBinding, - dispatcher: Arc>, + dispatcher: Arc, } impl Component { @@ -95,10 +100,7 @@ impl Component { let binding = self.binding.clone(); plugin.bind(binding); let p = Arc::new(Box::new(plugin) as Box); - { - let mut disp = self.dispatcher.lock().unwrap(); - P::init(&mut disp, p.clone()); - } + P::init(&self.dispatcher, p.clone()); if self.plugins.insert(TypeId::of::

(), p).is_some() { panic!("registering a plugin that's already registered"); } @@ -113,16 +115,20 @@ impl Component { .expect("plugin downcast failure (should not happen!!)") } + pub fn register_handler(&mut self, pri: Priority, func: F) + where + E: Event, + F: Fn(&E) -> Propagation + 'static { + self.dispatcher.register(pri, func); + } + /// Returns the next event and flush the send queue. pub fn main(&mut self) -> Result<(), Error> { - self.dispatcher.lock().unwrap().flush_all(); + self.dispatcher.flush_all(); loop { let elem = self.read_element()?; - { - let mut disp = self.dispatcher.lock().unwrap(); - disp.dispatch(ReceiveElement(elem)); - disp.flush_all(); - } + self.dispatcher.dispatch(ReceiveElement(elem)); + self.dispatcher.flush_all(); } } diff --git a/src/event.rs b/src/event.rs index b9adebcd13583a84188d5c21ee445a24b244e9b9..35627213261d54fa523708d9d6b3994cd176eb28 100644 --- a/src/event.rs +++ b/src/event.rs @@ -4,6 +4,7 @@ use std::fmt::Debug; use std::collections::BTreeMap; use std::cmp::Ordering; use std::mem; +use std::sync::Mutex; use minidom::Element; @@ -72,21 +73,21 @@ pub enum Propagation { /// An event dispatcher, this takes care of dispatching events to their respective handlers. pub struct Dispatcher { - handlers: BTreeMap>>>, - queue: Vec<(TypeId, AbstractEvent)>, + handlers: Mutex>>>>, + queue: Mutex>, } impl Dispatcher { /// Create a new `Dispatcher`. pub fn new() -> Dispatcher { Dispatcher { - handlers: BTreeMap::new(), - queue: Vec::new(), + handlers: Mutex::new(BTreeMap::new()), + queue: Mutex::new(Vec::new()), } } /// Register an event handler. - pub fn register(&mut self, priority: Priority, func: F) + pub fn register(&self, priority: Priority, func: F) where E: Event, F: Fn(&E) -> Propagation + 'static { @@ -110,24 +111,28 @@ impl Dispatcher { func: func, _marker: PhantomData, }) as Box; - let ent = self.handlers.entry(TypeId::of::()) - .or_insert_with(|| Vec::new()); + let mut guard = self.handlers.lock().unwrap(); + let ent = guard.entry(TypeId::of::()) + .or_insert_with(|| Vec::new()); ent.push(Record(priority, handler)); ent.sort(); } /// Append an event to the queue. - pub fn dispatch(&mut self, event: E) where E: Event { - self.queue.push((TypeId::of::(), AbstractEvent::new(event))); + pub fn dispatch(&self, event: E) where E: Event { + self.queue.lock().unwrap().push((TypeId::of::(), AbstractEvent::new(event))); } /// Flush all events in the queue so they can be handled by their respective handlers. /// Returns whether there are still pending events. - pub fn flush(&mut self) -> bool { + pub fn flush(&self) -> bool { let mut q = Vec::new(); - mem::swap(&mut self.queue, &mut q); + { + let mut my_q = self.queue.lock().unwrap(); + mem::swap(my_q.as_mut(), &mut q); + } 'evts: for (t, evt) in q { - if let Some(handlers) = self.handlers.get_mut(&t) { + if let Some(handlers) = self.handlers.lock().unwrap().get_mut(&t) { for &mut Record(_, ref mut handler) in handlers { match handler.handle(&evt) { Propagation::Stop => { continue 'evts; }, @@ -136,12 +141,12 @@ impl Dispatcher { } } } - !self.queue.is_empty() + !self.queue.lock().unwrap().is_empty() } /// Flushes all events, like `flush`, but keeps doing this until there is nothing left in the /// queue. - pub fn flush_all(&mut self) { + pub fn flush_all(&self) { while self.flush() {} } } @@ -176,7 +181,7 @@ mod tests { #[test] #[should_panic(expected = "success")] fn test() { - let mut disp = Dispatcher::new(); + let disp = Dispatcher::new(); #[derive(Debug)] struct MyEvent { diff --git a/src/plugin.rs b/src/plugin.rs index d025edb2ea10bcb9372b3c749a902a0ca65f969d..295da16a26b909dc424efc2acd100d819f78ee1c 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -4,7 +4,7 @@ use event::{Event, Dispatcher, SendElement, Priority, Propagation}; use std::any::Any; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::mem; @@ -12,11 +12,11 @@ use minidom::Element; #[derive(Clone)] pub struct PluginProxyBinding { - dispatcher: Arc>, + dispatcher: Arc, } impl PluginProxyBinding { - pub fn new(dispatcher: Arc>) -> PluginProxyBinding { + pub fn new(dispatcher: Arc) -> PluginProxyBinding { PluginProxyBinding { dispatcher: dispatcher, } @@ -57,7 +57,7 @@ impl PluginProxy { pub fn dispatch(&self, event: E) { self.with_binding(move |binding| { // TODO: proper error handling - binding.dispatcher.lock().unwrap().dispatch(event); + binding.dispatcher.dispatch(event); }); } @@ -68,7 +68,7 @@ impl PluginProxy { F: Fn(&E) -> Propagation + 'static { self.with_binding(move |binding| { // TODO: proper error handling - binding.dispatcher.lock().unwrap().register(priority, func); + binding.dispatcher.register(priority, func); }); } @@ -90,7 +90,7 @@ pub trait Plugin: Any + PluginAny { } pub trait PluginInit { - fn init(dispatcher: &mut Dispatcher, me: Arc>); + fn init(dispatcher: &Dispatcher, me: Arc>); } pub trait PluginAny { diff --git a/src/plugin_macro.rs b/src/plugin_macro.rs index 687e2c4e671c38d92354a976c1336b5275d97250..2d8e03430950efa7da947f3c6b32dfea3169d636 100644 --- a/src/plugin_macro.rs +++ b/src/plugin_macro.rs @@ -9,7 +9,7 @@ macro_rules! impl_plugin { #[allow(unused_variables)] impl $crate::plugin::PluginInit for $plugin { - fn init( dispatcher: &mut $crate::event::Dispatcher + fn init( dispatcher: &$crate::event::Dispatcher , me: ::std::sync::Arc>) { $( let new_arc = me.clone();