Parcourir la source

Use condition_variable in multithreading pdevs coordinator

Eric Ramat il y a 3 ans
Parent
commit
352d0d9ea0
1 fichiers modifiés avec 30 ajouts et 29 suppressions
  1. 30 29
      src/artis-star/kernel/pdevs/multithreading/Coordinator.hpp

+ 30 - 29
src/artis-star/kernel/pdevs/multithreading/Coordinator.hpp

@@ -60,26 +60,26 @@ template<class Time>
 struct done_start_message
 {
   explicit done_start_message(typename Time::type tn,
-                              common::Model<Time> *child)
+                              common::Model <Time> *child)
       :
       _tn(tn), _child(child)
   {}
 
   typename Time::type _tn;
-  common::Model<Time> *_child;
+  common::Model <Time> *_child;
 };
 
 template<class Time>
 struct done_transition_message
 {
   explicit done_transition_message(typename Time::type tn,
-                                   common::Model<Time> *child)
+                                   common::Model <Time> *child)
       :
       _tn(tn), _child(child)
   {}
 
   typename Time::type _tn;
-  common::Model<Time> *_child;
+  common::Model <Time> *_child;
 };
 
 template<class Time,
@@ -107,14 +107,13 @@ public:
           name, parameters, graph_parameters)
   {
     type::_graph_manager.init();
-    _thread = new std::thread([&] { loop(); });
+    _thread = std::make_shared<std::thread>([&] { loop(); });
   }
 
   virtual ~Coordinator()
   {
     done();
     _thread->join();
-    delete _thread;
   }
 
   void done()
@@ -129,36 +128,39 @@ public:
   void loop()
   {
     try {
-      for (;;) {
+      while (true) {
         _incoming.wait()
             .template handle<start_message_type>(
-                [&](start_message_type const &msg) {
+                [this](start_message_type const &msg) {
                   typename Time::type tn = start(msg._t);
                   _sender.send(done_start_message_type(tn, this));
                 })
             .
                 template handle<done_start_message_type>(
-                [&](done_start_message_type const &msg) {
+                [this](done_start_message_type const &msg) {
                   type::_event_table.init(msg._tn, msg._child);
                   --_received;
                   if (_received == 0) {
-                    _received_mutex.unlock();
+                    std::unique_lock <std::mutex> lock(_received_mutex);
+
+                    _condition.notify_one();
                   }
                 })
             .
                 template handle<transition_message_type>(
-                [&](transition_message_type const &msg) {
+                [this](transition_message_type const &msg) {
                   typename Time::type tn = transition(msg._t);
-                  _sender.send(done_transition_message_type(tn,
-                                                            this));
+                  _sender.send(done_transition_message_type(tn, this));
                 })
             .
                 template handle<done_transition_message_type>(
-                [&](done_transition_message_type const &msg) {
+                [this](done_transition_message_type const &msg) {
                   type::_event_table.put(msg._tn, msg._child);
                   --_received;
                   if (_received == 0) {
-                    _received_mutex.unlock();
+                    std::unique_lock <std::mutex> lock(_received_mutex);
+
+                    _condition.notify_one();
                   }
                 });
       }
@@ -179,10 +181,10 @@ public:
     }
 
     if (_received > 0) {
-      _received_mutex.lock();
-      type::_graph_manager.start(t);
+      std::unique_lock <std::mutex> lock(_received_mutex);
 
-      std::lock_guard<std::mutex> lock(_received_mutex);
+      type::_graph_manager.start(t);
+      _condition.wait(lock);
     }
 
     type::_tl = t;
@@ -200,11 +202,10 @@ public:
   {
     assert(t >= type::_tl and t <= type::_tn);
 
-    common::Models<Time> receivers = type::get_receivers();
-//                    common::Models<Time> IMM = type::_event_table.get_current_models(t);
-    common::Models<Time> IMM = type::_event_table.get_current_models(t,
-                                                                     type::_graph_manager
-                                                                         .lookahead(t));
+    common::Models <Time> receivers = type::get_receivers();
+//  common::Models<Time> IMM = type::_event_table.get_current_models(t);
+    common::Models <Time> IMM =
+        type::_event_table.get_current_models(t, type::_graph_manager.lookahead(t));
 
     _received = 0;
     for (auto &model : receivers) {
@@ -215,8 +216,7 @@ public:
       }
     }
     for (auto &model : IMM) {
-      if (std::find(receivers.begin(), receivers.end(),
-                    model) == receivers.end()) {
+      if (std::find(receivers.begin(), receivers.end(), model) == receivers.end()) {
         if (model->is_atomic()) {
           type::_event_table.put(model->transition(t), model);
         } else {
@@ -226,11 +226,11 @@ public:
     }
 
     if (_received > 0) {
-      _received_mutex.lock();
+      std::unique_lock <std::mutex> lock(_received_mutex);
+
       type::_graph_manager.transition(receivers, t);
       type::_graph_manager.transition(IMM, t);
-
-      std::lock_guard<std::mutex> lock(_received_mutex);
+      _condition.wait(lock);
     }
 
     parent_type::update_event_table(t);
@@ -242,11 +242,12 @@ public:
   }
 
 private:
-  std::thread *_thread;
+  std::shared_ptr <std::thread> _thread;
   artis::common::Receiver _incoming;
   artis::common::Sender _sender;
   unsigned int _received;
   std::mutex _received_mutex;
+  std::condition_variable _condition;
 };
 
 }