resolve deadlock, fix component.rs

lumi created

Change summary

src/client.rs       | 22 ++++++++--------------
src/component.rs    | 34 ++++++++++++++++++++--------------
src/event.rs        | 35 ++++++++++++++++++++---------------
src/plugin.rs       | 12 ++++++------
src/plugin_macro.rs |  2 +-
5 files changed, 55 insertions(+), 50 deletions(-)

Detailed changes

src/client.rs 🔗

@@ -77,7 +77,7 @@ impl ClientBuilder {
         let host = &self.host.unwrap_or(self.jid.domain.clone());
         let mut transport = SslTransport::connect(host, self.port)?;
         C2S::init(&mut transport, &self.jid.domain, "before_sasl")?;
-        let dispatcher = Arc::new(Mutex::new(Dispatcher::new()));
+        let dispatcher = Arc::new(Dispatcher::new());
         let mut credentials = self.credentials;
         credentials.channel_binding = transport.channel_bind();
         let transport = Arc::new(Mutex::new(transport));
@@ -88,7 +88,7 @@ impl ClientBuilder {
             binding: PluginProxyBinding::new(dispatcher.clone()),
             dispatcher: dispatcher,
         };
-        client.dispatcher.lock().unwrap().register(Priority::Default, move |evt: &SendElement| {
+        client.dispatcher.register(Priority::Default, move |evt: &SendElement| {
             let mut t = transport.lock().unwrap();
             t.write_element(&evt.0).unwrap();
             Propagation::Continue
@@ -105,7 +105,7 @@ pub struct Client {
     transport: Arc<Mutex<SslTransport>>,
     plugins: HashMap<TypeId, Arc<Box<Plugin>>>,
     binding: PluginProxyBinding,
-    dispatcher: Arc<Mutex<Dispatcher>>,
+    dispatcher: Arc<Dispatcher>,
 }
 
 impl Client {
@@ -119,10 +119,7 @@ impl Client {
         let binding = self.binding.clone();
         plugin.bind(binding);
         let p = Arc::new(Box::new(plugin) as Box<Plugin>);
-        {
-            let mut disp = self.dispatcher.lock().unwrap();
-            P::init(&mut disp, p.clone());
-        }
+        P::init(&self.dispatcher, p.clone());
         if self.plugins.insert(TypeId::of::<P>(), p).is_some() {
             panic!("registering a plugin that's already registered");
         }
@@ -132,7 +129,7 @@ impl Client {
         where
             E: Event,
             F: Fn(&E) -> Propagation + 'static {
-        self.dispatcher.lock().unwrap().register(pri, func);
+        self.dispatcher.register(pri, func);
     }
 
     /// Returns the plugin given by the type parameter, if it exists, else panics.
@@ -146,14 +143,11 @@ impl Client {
 
     /// Returns the next event and flush the send queue.
     pub fn main(&mut self) -> Result<(), Error> {
-        self.dispatcher.lock().unwrap().flush_all();
+        self.dispatcher.flush_all();
         loop {
             let elem = self.read_element()?;
-            {
-                let mut disp = self.dispatcher.lock().unwrap();
-                disp.dispatch(ReceiveElement(elem));
-                disp.flush_all();
-            }
+            self.dispatcher.dispatch(ReceiveElement(elem));
+            self.dispatcher.flush_all();
         }
     }
 

src/component.rs 🔗

@@ -4,7 +4,7 @@ use transport::{Transport, PlainTransport};
 use error::Error;
 use ns;
 use plugin::{Plugin, PluginInit, PluginProxyBinding};
-use event::{Dispatcher, ReceiveElement};
+use event::{Dispatcher, ReceiveElement, SendElement, Propagation, Priority, Event};
 use connection::{Connection, Component2S};
 use sha_1::{Sha1, Digest};
 
@@ -61,15 +61,20 @@ impl ComponentBuilder {
         let host = &self.host.unwrap_or(self.jid.domain.clone());
         let mut transport = PlainTransport::connect(host, self.port)?;
         Component2S::init(&mut transport, &self.jid.domain, "stream_opening")?;
-        let dispatcher = Arc::new(Mutex::new(Dispatcher::new()));
+        let dispatcher = Arc::new(Dispatcher::new());
         let transport = Arc::new(Mutex::new(transport));
         let mut component = Component {
             jid: self.jid,
-            transport: transport,
+            transport: transport.clone(),
             plugins: HashMap::new(),
             binding: PluginProxyBinding::new(dispatcher.clone()),
             dispatcher: dispatcher,
         };
+        component.dispatcher.register(Priority::Default, move |evt: &SendElement| {
+            let mut t = transport.lock().unwrap();
+            t.write_element(&evt.0).unwrap();
+            Propagation::Continue
+        });
         component.connect(self.secret)?;
         Ok(component)
     }
@@ -81,7 +86,7 @@ pub struct Component {
     transport: Arc<Mutex<PlainTransport>>,
     plugins: HashMap<TypeId, Arc<Box<Plugin>>>,
     binding: PluginProxyBinding,
-    dispatcher: Arc<Mutex<Dispatcher>>,
+    dispatcher: Arc<Dispatcher>,
 }
 
 impl Component {
@@ -95,10 +100,7 @@ impl Component {
         let binding = self.binding.clone();
         plugin.bind(binding);
         let p = Arc::new(Box::new(plugin) as Box<Plugin>);
-        {
-            let mut disp = self.dispatcher.lock().unwrap();
-            P::init(&mut disp, p.clone());
-        }
+        P::init(&self.dispatcher, p.clone());
         if self.plugins.insert(TypeId::of::<P>(), p).is_some() {
             panic!("registering a plugin that's already registered");
         }
@@ -113,16 +115,20 @@ impl Component {
                     .expect("plugin downcast failure (should not happen!!)")
     }
 
+    pub fn register_handler<E, F>(&mut self, pri: Priority, func: F)
+        where
+            E: Event,
+            F: Fn(&E) -> Propagation + 'static {
+        self.dispatcher.register(pri, func);
+    }
+
     /// Returns the next event and flush the send queue.
     pub fn main(&mut self) -> Result<(), Error> {
-        self.dispatcher.lock().unwrap().flush_all();
+        self.dispatcher.flush_all();
         loop {
             let elem = self.read_element()?;
-            {
-                let mut disp = self.dispatcher.lock().unwrap();
-                disp.dispatch(ReceiveElement(elem));
-                disp.flush_all();
-            }
+            self.dispatcher.dispatch(ReceiveElement(elem));
+            self.dispatcher.flush_all();
         }
     }
 

src/event.rs 🔗

@@ -4,6 +4,7 @@ use std::fmt::Debug;
 use std::collections::BTreeMap;
 use std::cmp::Ordering;
 use std::mem;
+use std::sync::Mutex;
 
 use minidom::Element;
 
@@ -72,21 +73,21 @@ pub enum Propagation {
 
 /// An event dispatcher, this takes care of dispatching events to their respective handlers.
 pub struct Dispatcher {
-    handlers: BTreeMap<TypeId, Vec<Record<Priority, Box<EventHandler>>>>,
-    queue: Vec<(TypeId, AbstractEvent)>,
+    handlers: Mutex<BTreeMap<TypeId, Vec<Record<Priority, Box<EventHandler>>>>>,
+    queue: Mutex<Vec<(TypeId, AbstractEvent)>>,
 }
 
 impl Dispatcher {
     /// Create a new `Dispatcher`.
     pub fn new() -> Dispatcher {
         Dispatcher {
-            handlers: BTreeMap::new(),
-            queue: Vec::new(),
+            handlers: Mutex::new(BTreeMap::new()),
+            queue: Mutex::new(Vec::new()),
         }
     }
 
     /// Register an event handler.
-    pub fn register<E, F>(&mut self, priority: Priority, func: F)
+    pub fn register<E, F>(&self, priority: Priority, func: F)
         where
             E: Event,
             F: Fn(&E) -> Propagation + 'static {
@@ -110,24 +111,28 @@ impl Dispatcher {
             func: func,
             _marker: PhantomData,
         }) as Box<EventHandler>;
-        let ent = self.handlers.entry(TypeId::of::<E>())
-                               .or_insert_with(|| Vec::new());
+        let mut guard = self.handlers.lock().unwrap();
+        let ent = guard.entry(TypeId::of::<E>())
+                       .or_insert_with(|| Vec::new());
         ent.push(Record(priority, handler));
         ent.sort();
     }
 
     /// Append an event to the queue.
-    pub fn dispatch<E>(&mut self, event: E) where E: Event {
-        self.queue.push((TypeId::of::<E>(), AbstractEvent::new(event)));
+    pub fn dispatch<E>(&self, event: E) where E: Event {
+        self.queue.lock().unwrap().push((TypeId::of::<E>(), AbstractEvent::new(event)));
     }
 
     /// Flush all events in the queue so they can be handled by their respective handlers.
     /// Returns whether there are still pending events.
-    pub fn flush(&mut self) -> bool {
+    pub fn flush(&self) -> bool {
         let mut q = Vec::new();
-        mem::swap(&mut self.queue, &mut q);
+        {
+            let mut my_q = self.queue.lock().unwrap();
+            mem::swap(my_q.as_mut(), &mut q);
+        }
         'evts: for (t, evt) in q {
-            if let Some(handlers) = self.handlers.get_mut(&t) {
+            if let Some(handlers) = self.handlers.lock().unwrap().get_mut(&t) {
                 for &mut Record(_, ref mut handler) in handlers {
                     match handler.handle(&evt) {
                         Propagation::Stop => { continue 'evts; },
@@ -136,12 +141,12 @@ impl Dispatcher {
                 }
             }
         }
-        !self.queue.is_empty()
+        !self.queue.lock().unwrap().is_empty()
     }
 
     /// Flushes all events, like `flush`, but keeps doing this until there is nothing left in the
     /// queue.
-    pub fn flush_all(&mut self) {
+    pub fn flush_all(&self) {
         while self.flush() {}
     }
 }
@@ -176,7 +181,7 @@ mod tests {
     #[test]
     #[should_panic(expected = "success")]
     fn test() {
-        let mut disp = Dispatcher::new();
+        let disp = Dispatcher::new();
 
         #[derive(Debug)]
         struct MyEvent {

src/plugin.rs 🔗

@@ -4,7 +4,7 @@ use event::{Event, Dispatcher, SendElement, Priority, Propagation};
 
 use std::any::Any;
 
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
 use std::mem;
 
@@ -12,11 +12,11 @@ use minidom::Element;
 
 #[derive(Clone)]
 pub struct PluginProxyBinding {
-    dispatcher: Arc<Mutex<Dispatcher>>,
+    dispatcher: Arc<Dispatcher>,
 }
 
 impl PluginProxyBinding {
-    pub fn new(dispatcher: Arc<Mutex<Dispatcher>>) -> PluginProxyBinding {
+    pub fn new(dispatcher: Arc<Dispatcher>) -> PluginProxyBinding {
         PluginProxyBinding {
             dispatcher: dispatcher,
         }
@@ -57,7 +57,7 @@ impl PluginProxy {
     pub fn dispatch<E: Event>(&self, event: E) {
         self.with_binding(move |binding| {
             // TODO: proper error handling
-            binding.dispatcher.lock().unwrap().dispatch(event);
+            binding.dispatcher.dispatch(event);
         });
     }
 
@@ -68,7 +68,7 @@ impl PluginProxy {
             F: Fn(&E) -> Propagation + 'static {
         self.with_binding(move |binding| {
             // TODO: proper error handling
-            binding.dispatcher.lock().unwrap().register(priority, func);
+            binding.dispatcher.register(priority, func);
         });
     }
 
@@ -90,7 +90,7 @@ pub trait Plugin: Any + PluginAny {
 }
 
 pub trait PluginInit {
-    fn init(dispatcher: &mut Dispatcher, me: Arc<Box<Plugin>>);
+    fn init(dispatcher: &Dispatcher, me: Arc<Box<Plugin>>);
 }
 
 pub trait PluginAny {

src/plugin_macro.rs 🔗

@@ -9,7 +9,7 @@ macro_rules! impl_plugin {
 
         #[allow(unused_variables)]
         impl $crate::plugin::PluginInit for $plugin {
-            fn init( dispatcher: &mut $crate::event::Dispatcher
+            fn init( dispatcher: &$crate::event::Dispatcher
                    , me: ::std::sync::Arc<Box<$crate::plugin::Plugin>>) {
                 $(
                     let new_arc = me.clone();