/** * @file kernel/pdevs/multithreading/Coordinator.hpp * @author The ARTIS Development Team * See the AUTHORS or Authors.txt file */ /* * ARTIS - the multimodeling and simulation environment * This file is a part of the ARTIS environment * * Copyright (C) 2013-2018 ULCO http://www.univ-littoral.fr * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #ifndef PDEVS_MULTITHREADING_COORDINATOR #define PDEVS_MULTITHREADING_COORDINATOR 1 #include #include #include namespace artis { namespace pdevs { namespace multithreading { template < class Time> struct start_message { explicit start_message(typename Time::type t) : _t(t) { } typename Time::type _t; }; template < class Time> struct transition_message { explicit transition_message(typename Time::type t) : _t(t) { } typename Time::type _t; }; template < class Time > struct done_start_message { explicit done_start_message(typename Time::type tn, common::Model < Time >* child) : _tn(tn), _child(child) { } typename Time::type _tn; common::Model < Time >* _child; }; template < class Time > struct done_transition_message { explicit done_transition_message(typename Time::type tn, common::Model < Time >* child) : _tn(tn), _child(child) { } typename Time::type _tn; common::Model < Time >* _child; }; template < class Time, class GraphManager, class Parameters = common::NoParameters, class GraphParameters = common::NoParameters > class Coordinator : public pdevs::Coordinator < Time, GraphManager, Parameters, GraphParameters > { typedef pdevs::Coordinator < Time, GraphManager, Parameters, GraphParameters > parent_type; typedef Coordinator < Time, GraphManager, Parameters, GraphParameters > type; typedef done_start_message < Time > done_start_message_type; typedef start_message < Time > start_message_type; typedef done_transition_message < Time > done_transition_message_type; typedef transition_message < Time > transition_message_type; public: Coordinator(const std::string& name, const Parameters& parameters, const GraphParameters& graph_parameters) : common::Model < Time >(name), pdevs::Coordinator < Time, GraphManager, Parameters, GraphParameters >(name, parameters, graph_parameters) { type::_graph_manager.init(); _thread = new std::thread([&]{ loop(); }); } virtual ~Coordinator() { done(); _thread->join(); delete _thread; } void done() { get_sender().send(artis::common::Close()); } artis::common::Sender get_sender() { return _incoming; } void set_sender(common::Sender sender) { _sender = sender; } void loop() { try { for(;;) { _incoming.wait() .template handle < start_message_type >( [&](start_message_type const& msg) { typename Time::type tn = start(msg._t); _sender.send(done_start_message_type(tn, this)); }) .template handle < done_start_message_type >( [&](done_start_message_type const& msg) { type::_event_table.init(msg._tn, msg._child); --_received; if (_received == 0) { _received_mutex.unlock(); } }) .template handle < transition_message_type >( [&](transition_message_type const& msg) { typename Time::type tn = transition(msg._t); _sender.send(done_transition_message_type(tn, this)); }) .template handle < done_transition_message_type >( [&](done_transition_message_type const& msg) { type::_event_table.put(msg._tn, msg._child); --_received; if (_received == 0) { _received_mutex.unlock(); } }); } } catch(artis::common::Close const&) { } } typename Time::type start(const typename Time::type& t) { _received = 0; for (auto & child : parent_type::_graph_manager.children()) { if (child->is_atomic()) { type::_event_table.init(child->start(type::_tn), child); } else { ++_received; } } if (_received > 0) { _received_mutex.lock(); type::_graph_manager.start(t); std::lock_guard < std::mutex > lock(_received_mutex); } type::_tl = t; type::_tn = type::_event_table.get_current_time(); return type::_tn; } // TODO: to remove virtual int get_receiver_number(typename Time::type t) { return type::_event_table.get_current_models(t).size(); } typename Time::type transition(const typename Time::type& t) { assert(t >= type::_tl and t <= type::_tn); common::Models < Time > receivers = type::_event_table.get_current_models(t); type::add_models_with_inputs(receivers); _received = 0; for (auto & model : receivers) { if (model->is_atomic()) { type::_event_table.put(model->transition(t), model); } else { ++_received; } } if (_received > 0) { _received_mutex.lock(); type::_graph_manager.transition(receivers, t); std::lock_guard < std::mutex > lock(_received_mutex); } parent_type::update_event_table(t); type::_tl = t; type::_tn = type::_event_table.get_current_time(); type::clear_bag(); return type::_tn; } private: std::thread* _thread; artis::common::Receiver _incoming; artis::common::Sender _sender; unsigned int _received; std::mutex _received_mutex; }; } } } // namespace artis pdevs multithreading #endif