123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- /**
- * @file kernel/pdevs/multithreading/Coordinator.hpp
- * @author The ARTIS Development Team
- * See the AUTHORS or Authors.txt file
- */
- /*
- * ARTIS - the multimodeling and simulation environment
- * This file is a part of the ARTIS environment
- *
- * Copyright (C) 2013-2021 ULCO http://www.univ-littoral.fr
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- #ifndef PDEVS_MULTITHREADING_COORDINATOR
- #define PDEVS_MULTITHREADING_COORDINATOR
- #include <artis-star/common/utils/Multithreading.hpp>
- #include <artis-star/kernel/pdevs/Coordinator.hpp>
- #include <condition_variable>
- #include <mutex>
- #include <thread>
- namespace artis::pdevs::multithreading {
- template<typename Time>
- struct start_message : common::Message {
- explicit start_message(typename Time::type t)
- : _t(t) {}
- typename Time::type _t;
- };
- template<typename Time>
- struct transition_message : common::Message {
- explicit transition_message(typename Time::type t)
- : _t(t) {}
- typename Time::type _t;
- };
- template<typename Time>
- struct done_start_message : common::Message {
- explicit done_start_message(typename Time::type tn,
- common::Model<Time> *child)
- :
- _tn(tn), _child(child) {}
- typename Time::type _tn;
- common::Model<Time> *_child;
- };
- template<typename Time>
- struct done_transition_message : common::Message {
- explicit done_transition_message(typename Time::type tn,
- common::Model<Time> *child)
- :
- _tn(tn), _child(child) {}
- typename Time::type _tn;
- common::Model<Time> *_child;
- };
- struct Queues {
- std::shared_ptr<common::MessageQueue> child_queue;
- std::shared_ptr<common::MessageQueue> parent_queue;
- std::mutex mutex;
- std::condition_variable condition;
- Queues() : child_queue(new common::MessageQueue), parent_queue(new common::MessageQueue) {}
- bool empty() const { return child_queue->empty() and parent_queue->empty(); }
- std::shared_ptr<common::Message> pop_child_queue() {
- std::unique_lock<std::mutex> lock(mutex);
- std::shared_ptr<common::Message> message = child_queue->front();
- child_queue->pop();
- return message;
- }
- std::shared_ptr<common::Message> pop_parent_queue() {
- std::unique_lock<std::mutex> lock(mutex);
- std::shared_ptr<common::Message> message = parent_queue->front();
- parent_queue->pop();
- return message;
- }
- void push_child_queue(const std::shared_ptr<common::Message> &message) {
- std::unique_lock<std::mutex> lock(mutex);
- child_queue->push(message);
- condition.notify_one();
- }
- void push_parent_queue(const std::shared_ptr<common::Message> &message) {
- std::unique_lock<std::mutex> lock(mutex);
- parent_queue->push(message);
- condition.notify_one();
- }
- };
- template<typename Time,
- typename GraphManager,
- typename Parameters = common::NoParameters,
- typename GraphParameters = common::NoParameters>
- class Coordinator
- : public pdevs::Coordinator<Time, GraphManager, Parameters, GraphParameters> {
- typedef pdevs::Coordinator<Time, GraphManager,
- Parameters, GraphParameters> parent_type;
- typedef Coordinator<Time, GraphManager,
- Parameters, GraphParameters> type;
- typedef done_start_message<Time> done_start_message_type;
- typedef start_message<Time> start_message_type;
- typedef done_transition_message<Time> done_transition_message_type;
- typedef transition_message<Time> transition_message_type;
- public:
- Coordinator(const std::string &name,
- const Parameters ¶meters,
- const GraphParameters &graph_parameters)
- :
- common::Model<Time>(name),
- pdevs::Coordinator<Time, GraphManager, Parameters, GraphParameters>(
- name, parameters, graph_parameters),
- _queues(new Queues) {
- type::_graph_manager.init();
- _thread = std::make_shared<std::thread>([&] { loop(); });
- }
- virtual ~Coordinator() {
- done();
- _thread->join();
- }
- void attach_child(common::Model<Time> *model,
- const std::shared_ptr<Queues> &child_queue) {
- _child_queues[model] = child_queue;
- }
- void attach_parent(const std::shared_ptr<Queues> &parent_queue) {
- _parent_queue = parent_queue;
- }
- void done() {
- _queues->push_parent_queue(std::shared_ptr<artis::common::Message>(new artis::common::Close));
- }
- const std::shared_ptr<Queues> &get_queue() const {
- return _queues;
- }
- bool process(const common::Message *message) {
- if (const done_transition_message<Time>
- *m = dynamic_cast< const done_transition_message<Time> *>(message)) {
- type::_event_table.put(m->_tn, m->_child);
- --_received;
- if (_received == 0) {
- std::unique_lock<std::mutex> lock(_received_mutex);
- _received_condition.notify_one();
- }
- } else if (const transition_message<Time>
- *m = dynamic_cast< const transition_message<Time> *>(message)) {
- typename Time::type tn = transition(m->_t);
- _parent_queue
- ->push_parent_queue(
- std::shared_ptr<common::Message>(new done_transition_message_type(tn, this)));
- } else if (const start_message<Time> *m = dynamic_cast< const start_message<Time> *>(message)) {
- typename Time::type tn = start(m->_t);
- _parent_queue
- ->push_parent_queue(
- std::shared_ptr<common::Message>(new done_start_message_type(tn, this)));
- } else if (const done_start_message<Time>
- *m = dynamic_cast< const done_start_message<Time> *>(message)) {
- type::_event_table.init(m->_tn, m->_child);
- --_received;
- if (_received == 0) {
- std::unique_lock<std::mutex> lock(_received_mutex);
- _received_condition.notify_one();
- }
- } else if (dynamic_cast< const artis::common::Close *>(message)) {
- return false;
- }
- return true;
- }
- void loop() {
- while (true) {
- {
- std::unique_lock<std::mutex> lock(_queues->mutex);
- while (_queues->empty()) {
- _queues->condition.wait(lock);
- }
- }
- if (not _queues->child_queue->empty()) {
- std::shared_ptr<common::Message> message = _queues->pop_child_queue();
- process(message.get());
- }
- if (not _queues->parent_queue->empty()) {
- std::shared_ptr<common::Message> message = _queues->pop_parent_queue();
- if (not process(message.get())) {
- break;
- }
- }
- }
- }
- typename Time::type start(const typename Time::type &t) {
- _received = 0;
- for (auto &child: parent_type::_graph_manager.children()) {
- if (child->is_atomic()) {
- type::_event_table.init(child->start(type::_tn), child);
- } else {
- ++_received;
- }
- }
- if (_received > 0) {
- std::unique_lock<std::mutex> lock(_received_mutex);
- for (const auto &q: _child_queues) {
- q.second
- ->push_child_queue(std::shared_ptr<artis::common::Message>(new start_message<Time>(t)));
- }
- _received_condition.wait(lock);
- }
- type::_tl = t;
- type::_tn = type::_event_table.get_current_time();
- return type::_tn;
- }
- void output(const typename Time::type &t) {
- assert(t == type::_tn);
- common::Models<Time> IMM = type::_event_table.get_current_models(t);
- for (auto &model: IMM) {
- model->output(t);
- }
- }
- typename Time::type transition(const typename Time::type &t) {
- assert(t >= type::_tl and t <= type::_tn);
- common::Models<Time> receivers = type::get_receivers();
- common::Models<Time> IMM = type::_event_table.get_current_models(t);
- // common::Models <Time>
- // IMM = type::_event_table.get_current_models(t, type::_graph_manager.lookahead(t));
- _received = 0;
- for (auto &model: receivers) {
- if (model->is_atomic()) {
- type::_event_table.put(model->transition(t), model);
- } else {
- ++_received;
- }
- }
- for (auto &model: IMM) {
- if (std::find(receivers.begin(), receivers.end(), model) == receivers.end()) {
- if (model->is_atomic()) {
- type::_event_table.put(model->transition(model->get_tn()), model);
- } else {
- ++_received;
- }
- }
- }
- if (_received > 0) {
- std::unique_lock<std::mutex> lock(_received_mutex);
- if (not receivers.empty()) {
- std::for_each(receivers.begin(), receivers.end(),
- [this, t](common::Model<Time> *model) {
- auto it = _child_queues.find(model);
- if (it != _child_queues.end()) {
- it->second->push_child_queue(
- std::shared_ptr<artis::common::Message>(
- new transition_message<Time>(t)));
- }
- });
- }
- if (not IMM.empty()) {
- std::for_each(IMM.begin(), IMM.end(),
- [this, t](common::Model<Time> *model) {
- auto it = _child_queues.find(model);
- if (it != _child_queues.end()) {
- it->second->push_child_queue(
- std::shared_ptr<artis::common::Message>(
- new transition_message<Time>(model->get_tn())));
- }
- });
- }
- _received_condition.wait(lock);
- }
- type::_tl = t;
- type::_tn = type::_event_table.get_current_time();
- type::clear_bag();
- return type::_tn;
- }
- private:
- std::shared_ptr<std::thread> _thread;
- std::shared_ptr<Queues> _queues;
- unsigned int _received;
- std::mutex _received_mutex;
- std::condition_variable _received_condition;
- std::shared_ptr<Queues> _parent_queue;
- std::map<common::Model<Time> *, std::shared_ptr<Queues >> _child_queues;
- };
- } // namespace artis pdevs multithreading
- #endif
|