Parcourir la source

pdev/multithreading: new algorithm

Eric Ramat il y a 3 ans
Parent
commit
a24e501a83

+ 0 - 8
src/artis-star/common/Model.hpp

@@ -147,14 +147,6 @@ public:
   Model<Time> *get_parent() const
   { return _parent; }
 
-  // TODO: to remove
-  virtual int get_receiver_number(typename Time::type t)
-  {
-    (void) t;
-
-    return 0;
-  }
-
   virtual const Model<Time> *get_submodel(unsigned int index) const
   {
     (void) index;

+ 4 - 221
src/artis-star/common/utils/Multithreading.hpp

@@ -27,239 +27,22 @@
 #ifndef COMMON_UTILS_MULTITHREADING
 #define COMMON_UTILS_MULTITHREADING
 
-#include <condition_variable>
 #include <queue>
-#include <mutex>
 
 namespace artis {
 namespace common {
 
-struct BaseMessage
+struct Message
 {
-  virtual ~BaseMessage()
-  {}
+  virtual ~Message() = default;
 };
 
-template<typename Msg>
-struct Message : BaseMessage
-{
-  explicit Message(Msg const &content)
-      : _content(content)
-  {}
-
-  Msg _content;
-};
-
-class Close
+struct Close : Message
 {
 };
 
-class MessageQueue
+class MessageQueue : public std::queue<std::shared_ptr < Message>>
 {
-public:
-  MessageQueue() = default;
-
-  template<typename T>
-  void push(T const &msg)
-  {
-    std::lock_guard<std::mutex> lock(_mutex);
-
-    _queue.push(std::make_shared<Message<T> >(msg));
-    _condition.notify_all();
-  }
-
-  std::shared_ptr<BaseMessage> wait_and_pop()
-  {
-    std::unique_lock<std::mutex> lock(_mutex);
-
-    _condition.wait(lock, [&] { return not _queue.empty(); });
-    auto res = _queue.front();
-    _queue.pop();
-    return res;
-  }
-
-private:
-  std::mutex _mutex;
-  std::condition_variable _condition;
-  std::queue<std::shared_ptr<BaseMessage> > _queue;
-};
-
-class Sender
-{
-public:
-  Sender()
-      : _queue(0)
-  {}
-
-  explicit Sender(MessageQueue *queue)
-      : _queue(queue)
-  {}
-
-  template<typename Message>
-  void send(Message const &msg)
-  {
-    if (_queue) {
-      _queue->push(msg);
-    }
-  }
-
-private:
-  MessageQueue *_queue;
-};
-
-template<typename PreviousDispatcher, typename Msg, typename Func>
-class TemplateDispatcher
-{
-  template<typename Dispatcher, typename OtherMsg, typename OtherFunc>
-  friend
-  class TemplateDispatcher;
-
-public:
-  TemplateDispatcher(TemplateDispatcher &&other)
-      : _queue(other._queue),
-        _previous(other._previous),
-        _function(
-            std::move(
-                other._function)),
-        _chained(other._chained)
-  { other._chained = true; }
-
-  TemplateDispatcher(MessageQueue *queue,
-                     PreviousDispatcher *previous,
-                     Func &&function)
-      :
-      _queue(queue), _previous(previous),
-      _function(std::forward<Func>(function)),
-      _chained(false)
-  { previous->_chained = true; }
-
-  bool dispatch(std::shared_ptr<BaseMessage> const &msg)
-  {
-    Message<Msg> *message =
-        dynamic_cast < Message<Msg> * >(msg.get());
-
-    if (message) {
-      _function(message->_content);
-      return true;
-    } else {
-      return _previous->dispatch(msg);
-    }
-  }
-
-  template<typename OtherMsg, typename OtherFunc>
-  TemplateDispatcher<TemplateDispatcher<PreviousDispatcher, Msg, Func>,
-                     OtherMsg, OtherFunc>
-  handle(OtherFunc &&of)
-  {
-    return TemplateDispatcher<TemplateDispatcher<PreviousDispatcher,
-                                                 Msg, Func>,
-                              OtherMsg, OtherFunc>(
-        _queue, this, std::forward<OtherFunc>(of));
-  }
-
-  ~TemplateDispatcher() noexcept(false)
-  {
-    if (not _chained) {
-      wait_and_dispatch();
-    }
-  }
-
-private:
-  TemplateDispatcher(TemplateDispatcher const &) = delete;
-
-  TemplateDispatcher &operator=(TemplateDispatcher const &) = delete;
-
-  void wait_and_dispatch()
-  {
-    for (;;) {
-      auto msg = _queue->wait_and_pop();
-
-      if (dispatch(msg)) {
-        break;
-      }
-    }
-  }
-
-  MessageQueue *_queue;
-  PreviousDispatcher *_previous;
-  Func _function;
-  bool _chained;
-};
-
-class Dispatcher
-{
-  template<typename Dispatcher, typename Msg, typename Func>
-  friend
-  class TemplateDispatcher;
-
-public:
-  Dispatcher(Dispatcher &&other)
-      : _queue(other._queue),
-        _chained(other._chained)
-  { other._chained = true; }
-
-  explicit Dispatcher(MessageQueue *queue)
-      : _queue(queue), _chained(false)
-  {}
-
-  template<typename Message, typename Func>
-  TemplateDispatcher<Dispatcher, Message, Func>
-  handle(Func &&function)
-  {
-    return TemplateDispatcher<Dispatcher, Message, Func>(
-        _queue, this, std::forward<Func>(function));
-  }
-
-  ~Dispatcher() noexcept(false)
-  {
-    if (not _chained) {
-      wait_and_dispatch();
-    }
-  }
-
-private:
-  Dispatcher(Dispatcher const &) = delete;
-
-  Dispatcher &operator=(Dispatcher const &) = delete;
-
-  void wait_and_dispatch()
-  {
-    for (;;) {
-      auto msg = _queue->wait_and_pop();
-
-      dispatch(msg);
-    }
-  }
-
-  bool dispatch(std::shared_ptr<BaseMessage> const &msg)
-  {
-    if (dynamic_cast < Message<Close> * >(msg.get())) {
-      throw Close();
-    }
-    return false;
-  }
-
-  MessageQueue *_queue;
-  bool _chained;
-};
-
-class Receiver
-{
-public:
-  Receiver() : _queue(new MessageQueue)
-  {}
-
-  ~Receiver()
-  { delete _queue; }
-
-  Sender get_sender()
-  { return Sender(_queue); }
-
-  Dispatcher wait()
-  { return Dispatcher(_queue); }
-
-private:
-  MessageQueue *_queue;
 };
 
 }

+ 165 - 67
src/artis-star/kernel/pdevs/multithreading/Coordinator.hpp

@@ -30,6 +30,8 @@
 #include <artis-star/common/utils/Multithreading.hpp>
 #include <artis-star/kernel/pdevs/Coordinator.hpp>
 
+#include <condition_variable>
+#include <mutex>
 #include <thread>
 
 namespace artis {
@@ -37,7 +39,7 @@ namespace pdevs {
 namespace multithreading {
 
 template<class Time>
-struct start_message
+struct start_message : common::Message
 {
   explicit start_message(typename Time::type t)
       : _t(t)
@@ -47,7 +49,7 @@ struct start_message
 };
 
 template<class Time>
-struct transition_message
+struct transition_message : common::Message
 {
   explicit transition_message(typename Time::type t)
       : _t(t)
@@ -57,7 +59,7 @@ struct transition_message
 };
 
 template<class Time>
-struct done_start_message
+struct done_start_message : common::Message
 {
   explicit done_start_message(typename Time::type tn,
                               common::Model <Time> *child)
@@ -70,7 +72,7 @@ struct done_start_message
 };
 
 template<class Time>
-struct done_transition_message
+struct done_transition_message : common::Message
 {
   explicit done_transition_message(typename Time::type tn,
                                    common::Model <Time> *child)
@@ -82,6 +84,54 @@ struct done_transition_message
   common::Model <Time> *_child;
 };
 
+struct Queues
+{
+  std::shared_ptr <common::MessageQueue> child_queue;
+  std::shared_ptr <common::MessageQueue> parent_queue;
+  std::mutex mutex;
+  std::condition_variable condition;
+
+  Queues() : child_queue(new common::MessageQueue), parent_queue(new common::MessageQueue)
+  {}
+
+  bool empty() const
+  { return child_queue->empty() and parent_queue->empty(); }
+
+  std::shared_ptr <common::Message> pop_child_queue()
+  {
+    std::unique_lock <std::mutex> lock(mutex);
+    std::shared_ptr <common::Message> message = child_queue->front();
+
+    child_queue->pop();
+    return message;
+  }
+
+  std::shared_ptr <common::Message> pop_parent_queue()
+  {
+    std::unique_lock <std::mutex> lock(mutex);
+    std::shared_ptr <common::Message> message = parent_queue->front();
+
+    parent_queue->pop();
+    return message;
+  }
+
+  void push_child_queue(const std::shared_ptr <common::Message> &message)
+  {
+    std::unique_lock <std::mutex> lock(mutex);
+
+    child_queue->push(message);
+    condition.notify_one();
+  }
+
+  void push_parent_queue(const std::shared_ptr <common::Message> &message)
+  {
+    std::unique_lock <std::mutex> lock(mutex);
+
+    parent_queue->push(message);
+    condition.notify_one();
+  }
+};
+
 template<class Time,
     class GraphManager,
     class Parameters = common::NoParameters,
@@ -99,12 +149,14 @@ class Coordinator
   typedef transition_message<Time> transition_message_type;
 
 public:
-  Coordinator(const std::string &name, const Parameters &parameters,
+  Coordinator(const std::string &name,
+              const Parameters &parameters,
               const GraphParameters &graph_parameters)
       :
       common::Model<Time>(name),
       pdevs::Coordinator<Time, GraphManager, Parameters, GraphParameters>(
-          name, parameters, graph_parameters)
+          name, parameters, graph_parameters),
+      _queues(new Queues)
   {
     type::_graph_manager.init();
     _thread = std::make_shared<std::thread>([&] { loop(); });
@@ -116,60 +168,93 @@ public:
     _thread->join();
   }
 
+  void attach_child(common::Model <Time> *model,
+                    const std::shared_ptr <Queues> &child_queue)
+  {
+    _child_queues[model] = child_queue;
+  }
+
+  void attach_parent(const std::shared_ptr <Queues> &parent_queue)
+  {
+    _parent_queue = parent_queue;
+  }
+
   void done()
-  { get_sender().send(artis::common::Close()); }
+  {
+    _queues->push_parent_queue(std::shared_ptr<artis::common::Message>(new artis::common::Close));
+  }
 
-  artis::common::Sender get_sender()
-  { return _incoming.get_sender(); }
+  const std::shared_ptr <Queues> &get_queue() const
+  {
+    return _queues;
+  }
 
-  void set_sender(common::Sender sender)
-  { _sender = sender; }
+  bool process(const common::Message *message)
+  {
+    if (const done_transition_message<Time>
+        *m = dynamic_cast< const done_transition_message<Time> *>(message)) {
+      type::_event_table.put(m->_tn, m->_child);
+      --_received;
+      if (_received == 0) {
+        std::unique_lock <std::mutex> lock(_received_mutex);
+
+        _received_condition.notify_one();
+      }
+    } else if (const transition_message<Time>
+        *m = dynamic_cast< const transition_message<Time> *>(message)) {
+      typename Time::type tn = transition(m->_t);
+
+      _parent_queue
+          ->push_parent_queue(
+              std::shared_ptr<common::Message>(new done_transition_message_type(tn, this)));
+    } else if (const start_message<Time> *m = dynamic_cast< const start_message<Time> *>(message)) {
+      typename Time::type tn = start(m->_t);
+
+      _parent_queue
+          ->push_parent_queue(
+              std::shared_ptr<common::Message>(new done_start_message_type(tn, this)));
+    } else if (const done_start_message<Time>
+        *m = dynamic_cast< const done_start_message<Time> *>(message)) {
+      type::_event_table.init(m->_tn, m->_child);
+      --_received;
+      if (_received == 0) {
+        std::unique_lock <std::mutex> lock(_received_mutex);
+
+        _received_condition.notify_one();
+      }
+    } else if (dynamic_cast< const artis::common::Close *>(message)) {
+      return false;
+    }
+    return true;
+  }
 
   void loop()
   {
-    try {
-      while (true) {
-        _incoming.wait()
-            .template handle<start_message_type>(
-                [this](start_message_type const &msg) {
-                  typename Time::type tn = start(msg._t);
-                  _sender.send(done_start_message_type(tn, this));
-                })
-            .
-                template handle<done_start_message_type>(
-                [this](done_start_message_type const &msg) {
-                  type::_event_table.init(msg._tn, msg._child);
-                  --_received;
-                  if (_received == 0) {
-                    std::unique_lock <std::mutex> lock(_received_mutex);
-
-                    _condition.notify_one();
-                  }
-                })
-            .
-                template handle<transition_message_type>(
-                [this](transition_message_type const &msg) {
-                  typename Time::type tn = transition(msg._t);
-                  _sender.send(done_transition_message_type(tn, this));
-                })
-            .
-                template handle<done_transition_message_type>(
-                [this](done_transition_message_type const &msg) {
-                  type::_event_table.put(msg._tn, msg._child);
-                  --_received;
-                  if (_received == 0) {
-                    std::unique_lock <std::mutex> lock(_received_mutex);
-
-                    _condition.notify_one();
-                  }
-                });
+    while (true) {
+      {
+        std::unique_lock <std::mutex> lock(_queues->mutex);
+
+        while (_queues->empty()) {
+          _queues->condition.wait(lock);
+        }
+      }
+      if (not _queues->child_queue->empty()) {
+        std::shared_ptr <common::Message> message = _queues->pop_child_queue();
+
+        process(message.get());
+      }
+      if (not _queues->parent_queue->empty()) {
+        std::shared_ptr <common::Message> message = _queues->pop_parent_queue();
+
+        if (not process(message.get())) {
+          break;
+        }
       }
-    }
-    catch (artis::common::Close const &) {
     }
   }
 
-  typename Time::type start(const typename Time::type &t)
+  typename Time::type
+  start(const typename Time::type &t)
   {
     _received = 0;
     for (auto &child : parent_type::_graph_manager.children()) {
@@ -183,8 +268,11 @@ public:
     if (_received > 0) {
       std::unique_lock <std::mutex> lock(_received_mutex);
 
-      type::_graph_manager.start(t);
-      _condition.wait(lock);
+      for (const auto &q:_child_queues) {
+        q.second
+            ->push_child_queue(std::shared_ptr<artis::common::Message>(new start_message<Time>(t)));
+      }
+      _received_condition.wait(lock);
     }
 
     type::_tl = t;
@@ -192,18 +280,11 @@ public:
     return type::_tn;
   }
 
-  // TODO: to remove
-  virtual int get_receiver_number(typename Time::type t)
-  {
-    return type::_event_table.get_current_models(t).size();
-  }
-
   typename Time::type transition(const typename Time::type &t)
   {
     assert(t >= type::_tl and t <= type::_tn);
 
     common::Models <Time> receivers = type::get_receivers();
-//  common::Models<Time> IMM = type::_event_table.get_current_models(t);
     common::Models <Time> IMM =
         type::_event_table.get_current_models(t, type::_graph_manager.lookahead(t));
 
@@ -229,15 +310,31 @@ public:
       std::unique_lock <std::mutex> lock(_received_mutex);
 
       if (not receivers.empty()) {
-        type::_graph_manager.transition(receivers, t);
+        std::for_each(receivers.begin(), receivers.end(),
+                      [this, t](common::Model <Time> *model) {
+                        auto it = _child_queues.find(model);
+
+                        if (it != _child_queues.end()) {
+                          it->second->push_child_queue(
+                              std::shared_ptr<artis::common::Message>(
+                                  new transition_message<Time>(t)));
+                        }
+                      });
       }
       if (not IMM.empty()) {
-        type::_graph_manager.transition(IMM, t);
+        std::for_each(IMM.begin(), IMM.end(),
+                      [this, t](common::Model <Time> *model) {
+                        auto it = _child_queues.find(model);
+
+                        if (it != _child_queues.end()) {
+                          it->second->push_child_queue(
+                              std::shared_ptr<artis::common::Message>(
+                                  new transition_message<Time>(t)));
+                        }
+                      });
       }
-      _condition.wait(lock);
+      _received_condition.wait(lock);
     }
-
-//    parent_type::update_event_table(t);
     type::_tl = t;
     type::_tn = type::_event_table.get_current_time();
     type::clear_bag();
@@ -247,11 +344,12 @@ public:
 
 private:
   std::shared_ptr <std::thread> _thread;
-  artis::common::Receiver _incoming;
-  artis::common::Sender _sender;
+  std::shared_ptr <Queues> _queues;
   unsigned int _received;
   std::mutex _received_mutex;
-  std::condition_variable _condition;
+  std::condition_variable _received_condition;
+  std::shared_ptr <Queues> _parent_queue;
+  std::map<common::Model < Time> *, std::shared_ptr <Queues >> _child_queues;
 };
 
 }