/**
* @file kernel/pdevs/multithreading/Coordinator.hpp
* @author The ARTIS Development Team
* See the AUTHORS or Authors.txt file
*/
/*
* ARTIS - the multimodeling and simulation environment
* This file is a part of the ARTIS environment
*
* Copyright (C) 2013-2022 ULCO http://www.univ-littoral.fr
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#ifndef PDEVS_MULTITHREADING_COORDINATOR
#define PDEVS_MULTITHREADING_COORDINATOR
#include
#include
#include
#include
#include
namespace artis::pdevs::multithreading {
template
struct start_message : common::Message {
explicit start_message(typename Time::type t)
: _t(t) {}
typename Time::type _t;
};
template
struct transition_message : common::Message {
explicit transition_message(typename Time::type t)
: _t(t) {}
typename Time::type _t;
};
template
struct done_start_message : common::Message {
explicit done_start_message(typename Time::type tn,
common::Model *child)
:
_tn(tn), _child(child) {}
typename Time::type _tn;
common::Model *_child;
};
template
struct done_transition_message : common::Message {
explicit done_transition_message(typename Time::type tn,
common::Model *child)
:
_tn(tn), _child(child) {}
typename Time::type _tn;
common::Model *_child;
};
struct Queues {
std::shared_ptr child_queue;
std::shared_ptr parent_queue;
std::mutex mutex;
std::condition_variable condition;
Queues() : child_queue(new common::MessageQueue), parent_queue(new common::MessageQueue) {}
bool empty() const { return child_queue->empty() and parent_queue->empty(); }
std::shared_ptr pop_child_queue() {
std::unique_lock lock(mutex);
std::shared_ptr message = child_queue->front();
child_queue->pop();
return message;
}
std::shared_ptr pop_parent_queue() {
std::unique_lock lock(mutex);
std::shared_ptr message = parent_queue->front();
parent_queue->pop();
return message;
}
void push_child_queue(const std::shared_ptr &message) {
std::unique_lock lock(mutex);
child_queue->push(message);
condition.notify_one();
}
void push_parent_queue(const std::shared_ptr &message) {
std::unique_lock lock(mutex);
parent_queue->push(message);
condition.notify_one();
}
};
template
class Coordinator
: public pdevs::Coordinator {
typedef pdevs::Coordinator parent_type;
typedef Coordinator type;
typedef done_start_message done_start_message_type;
typedef start_message start_message_type;
typedef done_transition_message done_transition_message_type;
typedef transition_message transition_message_type;
public:
Coordinator(const std::string &name,
const Parameters ¶meters,
const GraphParameters &graph_parameters)
:
common::Model(name),
pdevs::Coordinator(
name, parameters, graph_parameters),
_queues(new Queues) {
type::_graph_manager.init();
_thread = std::make_shared([&] { loop(); });
}
virtual ~Coordinator() {
done();
_thread->join();
}
void attach_child(common::Model *model,
const std::shared_ptr &child_queue) {
_child_queues[model] = child_queue;
}
void attach_parent(const std::shared_ptr &parent_queue) {
_parent_queue = parent_queue;
}
void done() {
_queues->push_parent_queue(std::shared_ptr(new artis::common::Close));
}
const std::shared_ptr &get_queue() const {
return _queues;
}
bool process(const common::Message *message) {
if (const done_transition_message
*m = dynamic_cast< const done_transition_message *>(message)) {
type::_event_table.put(m->_tn, m->_child);
--_received;
if (_received == 0) {
std::unique_lock lock(_received_mutex);
_received_condition.notify_one();
}
} else if (const transition_message
*m = dynamic_cast< const transition_message *>(message)) {
typename Time::type tn = transition(m->_t);
_parent_queue
->push_parent_queue(
std::shared_ptr(new done_transition_message_type(tn, this)));
} else if (const start_message *m = dynamic_cast< const start_message *>(message)) {
typename Time::type tn = start(m->_t);
_parent_queue
->push_parent_queue(
std::shared_ptr(new done_start_message_type(tn, this)));
} else if (const done_start_message
*m = dynamic_cast< const done_start_message *>(message)) {
type::_event_table.init(m->_tn, m->_child);
--_received;
if (_received == 0) {
std::unique_lock lock(_received_mutex);
_received_condition.notify_one();
}
} else if (dynamic_cast< const artis::common::Close *>(message)) {
return false;
}
return true;
}
void loop() {
while (true) {
{
std::unique_lock lock(_queues->mutex);
while (_queues->empty()) {
_queues->condition.wait(lock);
}
}
if (not _queues->child_queue->empty()) {
std::shared_ptr message = _queues->pop_child_queue();
process(message.get());
}
if (not _queues->parent_queue->empty()) {
std::shared_ptr message = _queues->pop_parent_queue();
if (not process(message.get())) {
break;
}
}
}
}
typename Time::type start(const typename Time::type &t) {
_received = 0;
for (auto &child: parent_type::_graph_manager.children()) {
if (child->is_atomic()) {
type::_event_table.init(child->start(type::_tn), child);
} else {
++_received;
}
}
if (_received > 0) {
std::unique_lock lock(_received_mutex);
for (const auto &q: _child_queues) {
q.second
->push_child_queue(std::shared_ptr(new start_message(t)));
}
_received_condition.wait(lock);
}
type::_tl = t;
type::_tn = type::_event_table.get_current_time();
return type::_tn;
}
void output(const typename Time::type &t) {
assert(t == type::_tn);
common::Models IMM = type::_event_table.get_current_models(t);
for (auto &model: IMM) {
model->output(t);
}
}
typename Time::type transition(const typename Time::type &t) {
assert(t >= type::_tl and t <= type::_tn);
common::Models receivers = type::get_receivers();
common::Models IMM = type::_event_table.get_current_models(t);
// common::Models
// IMM = type::_event_table.get_current_models(t, type::_graph_manager.lookahead(t));
_received = 0;
for (auto &model: receivers) {
if (model->is_atomic()) {
type::_event_table.put(model->transition(t), model);
} else {
++_received;
}
}
for (auto &model: IMM) {
if (std::find(receivers.begin(), receivers.end(), model) == receivers.end()) {
if (model->is_atomic()) {
type::_event_table.put(model->transition(model->get_tn()), model);
} else {
++_received;
}
}
}
if (_received > 0) {
std::unique_lock lock(_received_mutex);
if (not receivers.empty()) {
std::for_each(receivers.begin(), receivers.end(),
[this, t](common::Model *model) {
auto it = _child_queues.find(model);
if (it != _child_queues.end()) {
it->second->push_child_queue(
std::shared_ptr(
new transition_message(t)));
}
});
}
if (not IMM.empty()) {
std::for_each(IMM.begin(), IMM.end(),
[this, t](common::Model *model) {
auto it = _child_queues.find(model);
if (it != _child_queues.end()) {
it->second->push_child_queue(
std::shared_ptr(
new transition_message(model->get_tn())));
}
});
}
_received_condition.wait(lock);
}
type::_tl = t;
type::_tn = type::_event_table.get_current_time();
type::clear_bag();
return type::_tn;
}
private:
std::shared_ptr _thread;
std::shared_ptr _queues;
unsigned int _received;
std::mutex _received_mutex;
std::condition_variable _received_condition;
std::shared_ptr _parent_queue;
std::map *, std::shared_ptr> _child_queues;
};
} // namespace artis pdevs multithreading
#endif