Coordinator.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /**
  2. * @file kernel/pdevs/multithreading/Coordinator.hpp
  3. * @author The ARTIS Development Team
  4. * See the AUTHORS or Authors.txt file
  5. */
  6. /*
  7. * ARTIS - the multimodeling and simulation environment
  8. * This file is a part of the ARTIS environment
  9. *
  10. * Copyright (C) 2013-2022 ULCO http://www.univ-littoral.fr
  11. *
  12. * This program is free software: you can redistribute it and/or modify
  13. * it under the terms of the GNU General Public License as published by
  14. * the Free Software Foundation, either version 3 of the License, or
  15. * (at your option) any later version.
  16. *
  17. * This program is distributed in the hope that it will be useful,
  18. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  19. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  20. * GNU General Public License for more details.
  21. *
  22. * You should have received a copy of the GNU General Public License
  23. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  24. */
  25. #ifndef PDEVS_MULTITHREADING_COORDINATOR
  26. #define PDEVS_MULTITHREADING_COORDINATOR
  27. #include <artis-star/common/utils/Multithreading.hpp>
  28. #include <artis-star/kernel/pdevs/Coordinator.hpp>
  29. #include <condition_variable>
  30. #include <mutex>
  31. #include <thread>
  32. namespace artis::pdevs::multithreading {
  33. template<typename Time>
  34. struct start_message : common::Message {
  35. explicit start_message(typename Time::type t)
  36. : _t(t) {}
  37. typename Time::type _t;
  38. };
  39. template<typename Time>
  40. struct transition_message : common::Message {
  41. explicit transition_message(typename Time::type t)
  42. : _t(t) {}
  43. typename Time::type _t;
  44. };
  45. template<typename Time>
  46. struct done_start_message : common::Message {
  47. explicit done_start_message(typename Time::type tn,
  48. common::Model<Time> *child)
  49. :
  50. _tn(tn), _child(child) {}
  51. typename Time::type _tn;
  52. common::Model<Time> *_child;
  53. };
  54. template<typename Time>
  55. struct done_transition_message : common::Message {
  56. explicit done_transition_message(typename Time::type tn,
  57. common::Model<Time> *child)
  58. :
  59. _tn(tn), _child(child) {}
  60. typename Time::type _tn;
  61. common::Model<Time> *_child;
  62. };
  63. struct Queues {
  64. std::shared_ptr<common::MessageQueue> child_queue;
  65. std::shared_ptr<common::MessageQueue> parent_queue;
  66. std::mutex mutex;
  67. std::condition_variable condition;
  68. Queues() : child_queue(new common::MessageQueue), parent_queue(new common::MessageQueue) {}
  69. bool empty() const { return child_queue->empty() and parent_queue->empty(); }
  70. std::shared_ptr<common::Message> pop_child_queue() {
  71. std::unique_lock<std::mutex> lock(mutex);
  72. std::shared_ptr<common::Message> message = child_queue->front();
  73. child_queue->pop();
  74. return message;
  75. }
  76. std::shared_ptr<common::Message> pop_parent_queue() {
  77. std::unique_lock<std::mutex> lock(mutex);
  78. std::shared_ptr<common::Message> message = parent_queue->front();
  79. parent_queue->pop();
  80. return message;
  81. }
  82. void push_child_queue(const std::shared_ptr<common::Message> &message) {
  83. std::unique_lock<std::mutex> lock(mutex);
  84. child_queue->push(message);
  85. condition.notify_one();
  86. }
  87. void push_parent_queue(const std::shared_ptr<common::Message> &message) {
  88. std::unique_lock<std::mutex> lock(mutex);
  89. parent_queue->push(message);
  90. condition.notify_one();
  91. }
  92. };
  93. template<typename Time,
  94. typename GraphManager,
  95. typename Parameters = common::NoParameters,
  96. typename GraphParameters = common::NoParameters>
  97. class Coordinator
  98. : public pdevs::Coordinator<Time, GraphManager, Parameters, GraphParameters> {
  99. typedef pdevs::Coordinator<Time, GraphManager,
  100. Parameters, GraphParameters> parent_type;
  101. typedef Coordinator<Time, GraphManager,
  102. Parameters, GraphParameters> type;
  103. typedef done_start_message<Time> done_start_message_type;
  104. typedef start_message<Time> start_message_type;
  105. typedef done_transition_message<Time> done_transition_message_type;
  106. typedef transition_message<Time> transition_message_type;
  107. public:
  108. Coordinator(const std::string &name,
  109. const Parameters &parameters,
  110. const GraphParameters &graph_parameters)
  111. :
  112. common::Model<Time>(name),
  113. pdevs::Coordinator<Time, GraphManager, Parameters, GraphParameters>(
  114. name, parameters, graph_parameters),
  115. _queues(new Queues) {
  116. type::_graph_manager.init();
  117. _thread = std::make_shared<std::thread>([&] { loop(); });
  118. }
  119. virtual ~Coordinator() {
  120. done();
  121. _thread->join();
  122. }
  123. void attach_child(common::Model<Time> *model,
  124. const std::shared_ptr<Queues> &child_queue) {
  125. _child_queues[model] = child_queue;
  126. }
  127. void attach_parent(const std::shared_ptr<Queues> &parent_queue) {
  128. _parent_queue = parent_queue;
  129. }
  130. void done() {
  131. _queues->push_parent_queue(std::shared_ptr<artis::common::Message>(new artis::common::Close));
  132. }
  133. const std::shared_ptr<Queues> &get_queue() const {
  134. return _queues;
  135. }
  136. bool process(const common::Message *message) {
  137. if (const done_transition_message<Time>
  138. *m = dynamic_cast< const done_transition_message<Time> *>(message)) {
  139. type::_event_table.put(m->_tn, m->_child);
  140. --_received;
  141. if (_received == 0) {
  142. std::unique_lock<std::mutex> lock(_received_mutex);
  143. _received_condition.notify_one();
  144. }
  145. } else if (const transition_message<Time>
  146. *m = dynamic_cast< const transition_message<Time> *>(message)) {
  147. typename Time::type tn = transition(m->_t);
  148. _parent_queue
  149. ->push_parent_queue(
  150. std::shared_ptr<common::Message>(new done_transition_message_type(tn, this)));
  151. } else if (const start_message<Time> *m = dynamic_cast< const start_message<Time> *>(message)) {
  152. typename Time::type tn = start(m->_t);
  153. _parent_queue
  154. ->push_parent_queue(
  155. std::shared_ptr<common::Message>(new done_start_message_type(tn, this)));
  156. } else if (const done_start_message<Time>
  157. *m = dynamic_cast< const done_start_message<Time> *>(message)) {
  158. type::_event_table.init(m->_tn, m->_child);
  159. --_received;
  160. if (_received == 0) {
  161. std::unique_lock<std::mutex> lock(_received_mutex);
  162. _received_condition.notify_one();
  163. }
  164. } else if (dynamic_cast< const artis::common::Close *>(message)) {
  165. return false;
  166. }
  167. return true;
  168. }
  169. void loop() {
  170. while (true) {
  171. {
  172. std::unique_lock<std::mutex> lock(_queues->mutex);
  173. while (_queues->empty()) {
  174. _queues->condition.wait(lock);
  175. }
  176. }
  177. if (not _queues->child_queue->empty()) {
  178. std::shared_ptr<common::Message> message = _queues->pop_child_queue();
  179. process(message.get());
  180. }
  181. if (not _queues->parent_queue->empty()) {
  182. std::shared_ptr<common::Message> message = _queues->pop_parent_queue();
  183. if (not process(message.get())) {
  184. break;
  185. }
  186. }
  187. }
  188. }
  189. typename Time::type start(const typename Time::type &t) {
  190. _received = 0;
  191. for (auto &child: parent_type::_graph_manager.children()) {
  192. if (child->is_atomic()) {
  193. type::_event_table.init(child->start(type::_tn), child);
  194. } else {
  195. ++_received;
  196. }
  197. }
  198. if (_received > 0) {
  199. std::unique_lock<std::mutex> lock(_received_mutex);
  200. for (const auto &q: _child_queues) {
  201. q.second
  202. ->push_child_queue(std::shared_ptr<artis::common::Message>(new start_message<Time>(t)));
  203. }
  204. _received_condition.wait(lock);
  205. }
  206. type::_tl = t;
  207. type::_tn = type::_event_table.get_current_time();
  208. return type::_tn;
  209. }
  210. void output(const typename Time::type &t) {
  211. assert(t == type::_tn);
  212. common::Models<Time> IMM = type::_event_table.get_current_models(t);
  213. for (auto &model: IMM) {
  214. model->output(t);
  215. }
  216. }
  217. typename Time::type transition(const typename Time::type &t) {
  218. assert(t >= type::_tl and t <= type::_tn);
  219. common::Models<Time> receivers = type::get_receivers();
  220. common::Models<Time> IMM = type::_event_table.get_current_models(t);
  221. // common::Models <Time>
  222. // IMM = type::_event_table.get_current_models(t, type::_graph_manager.lookahead(t));
  223. _received = 0;
  224. for (auto &model: receivers) {
  225. if (model->is_atomic()) {
  226. type::_event_table.put(model->transition(t), model);
  227. } else {
  228. ++_received;
  229. }
  230. }
  231. for (auto &model: IMM) {
  232. if (std::find(receivers.begin(), receivers.end(), model) == receivers.end()) {
  233. if (model->is_atomic()) {
  234. type::_event_table.put(model->transition(model->get_tn()), model);
  235. } else {
  236. ++_received;
  237. }
  238. }
  239. }
  240. if (_received > 0) {
  241. std::unique_lock<std::mutex> lock(_received_mutex);
  242. if (not receivers.empty()) {
  243. std::for_each(receivers.begin(), receivers.end(),
  244. [this, t](common::Model<Time> *model) {
  245. auto it = _child_queues.find(model);
  246. if (it != _child_queues.end()) {
  247. it->second->push_child_queue(
  248. std::shared_ptr<artis::common::Message>(
  249. new transition_message<Time>(t)));
  250. }
  251. });
  252. }
  253. if (not IMM.empty()) {
  254. std::for_each(IMM.begin(), IMM.end(),
  255. [this, t](common::Model<Time> *model) {
  256. auto it = _child_queues.find(model);
  257. if (it != _child_queues.end()) {
  258. it->second->push_child_queue(
  259. std::shared_ptr<artis::common::Message>(
  260. new transition_message<Time>(model->get_tn())));
  261. }
  262. });
  263. }
  264. _received_condition.wait(lock);
  265. }
  266. type::_tl = t;
  267. type::_tn = type::_event_table.get_current_time();
  268. type::clear_bag();
  269. return type::_tn;
  270. }
  271. private:
  272. std::shared_ptr<std::thread> _thread;
  273. std::shared_ptr<Queues> _queues;
  274. unsigned int _received;
  275. std::mutex _received_mutex;
  276. std::condition_variable _received_condition;
  277. std::shared_ptr<Queues> _parent_queue;
  278. std::map<common::Model<Time> *, std::shared_ptr<Queues >> _child_queues;
  279. };
  280. } // namespace artis pdevs multithreading
  281. #endif