Multithreading.hpp 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. /**
  2. * @file common/utils/Multithreading.hpp
  3. * @author The ARTIS Development Team
  4. * See the AUTHORS or Authors.txt file
  5. */
  6. /*
  7. * ARTIS - the multimodeling and simulation environment
  8. * This file is a part of the ARTIS environment
  9. *
  10. * Copyright (C) 2013-2019 ULCO http://www.univ-littoral.fr
  11. *
  12. * This program is free software: you can redistribute it and/or modify
  13. * it under the terms of the GNU General Public License as published by
  14. * the Free Software Foundation, either version 3 of the License, or
  15. * (at your option) any later version.
  16. *
  17. * This program is distributed in the hope that it will be useful,
  18. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  19. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  20. * GNU General Public License for more details.
  21. *
  22. * You should have received a copy of the GNU General Public License
  23. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  24. */
  25. #ifndef COMMON_UTILS_MULTITHREADING
  26. #define COMMON_UTILS_MULTITHREADING
  27. #include <condition_variable>
  28. #include <queue>
  29. #include <mutex>
  30. namespace artis {
  31. namespace common {
  32. struct BaseMessage {
  33. virtual ~BaseMessage() {}
  34. };
  35. template<typename Msg>
  36. struct Message : BaseMessage {
  37. explicit Message(Msg const &content)
  38. : _content(content) {}
  39. Msg _content;
  40. };
  41. class Close {
  42. };
  43. class MessageQueue {
  44. public:
  45. MessageQueue() = default;
  46. template<typename T>
  47. void push(T const &msg) {
  48. std::lock_guard<std::mutex> lock(_mutex);
  49. _queue.push(std::make_shared<Message<T> >(msg));
  50. _condition.notify_all();
  51. }
  52. std::shared_ptr<BaseMessage> wait_and_pop() {
  53. std::unique_lock<std::mutex> lock(_mutex);
  54. _condition.wait(lock, [&] { return not _queue.empty(); });
  55. auto res = _queue.front();
  56. _queue.pop();
  57. return res;
  58. }
  59. private:
  60. std::mutex _mutex;
  61. std::condition_variable _condition;
  62. std::queue<std::shared_ptr<BaseMessage> > _queue;
  63. };
  64. class Sender {
  65. public:
  66. Sender()
  67. : _queue(0) {}
  68. explicit Sender(MessageQueue *queue)
  69. : _queue(queue) {}
  70. template<typename Message>
  71. void send(Message const &msg) {
  72. if (_queue) {
  73. _queue->push(msg);
  74. }
  75. }
  76. private:
  77. MessageQueue *_queue;
  78. };
  79. template<typename PreviousDispatcher, typename Msg, typename Func>
  80. class TemplateDispatcher {
  81. template<typename Dispatcher, typename OtherMsg, typename OtherFunc>
  82. friend
  83. class TemplateDispatcher;
  84. public:
  85. TemplateDispatcher(TemplateDispatcher &&other)
  86. : _queue(other._queue),
  87. _previous(other._previous),
  88. _function(
  89. std::move(
  90. other._function)),
  91. _chained(other._chained) { other._chained = true; }
  92. TemplateDispatcher(MessageQueue *queue,
  93. PreviousDispatcher *previous,
  94. Func &&function)
  95. :
  96. _queue(queue), _previous(previous),
  97. _function(std::forward<Func>(function)),
  98. _chained(false) { previous->_chained = true; }
  99. bool dispatch(std::shared_ptr<BaseMessage> const &msg) {
  100. Message<Msg> *message =
  101. dynamic_cast < Message<Msg> * >(msg.get());
  102. if (message) {
  103. _function(message->_content);
  104. return true;
  105. } else {
  106. return _previous->dispatch(msg);
  107. }
  108. }
  109. template<typename OtherMsg, typename OtherFunc>
  110. TemplateDispatcher<TemplateDispatcher<PreviousDispatcher, Msg, Func>,
  111. OtherMsg, OtherFunc>
  112. handle(OtherFunc &&of) {
  113. return TemplateDispatcher<TemplateDispatcher<PreviousDispatcher,
  114. Msg, Func>,
  115. OtherMsg, OtherFunc>(
  116. _queue, this, std::forward<OtherFunc>(of));
  117. }
  118. ~TemplateDispatcher() noexcept(false) {
  119. if (not _chained) {
  120. wait_and_dispatch();
  121. }
  122. }
  123. private:
  124. TemplateDispatcher(TemplateDispatcher const &) = delete;
  125. TemplateDispatcher &operator=(TemplateDispatcher const &) = delete;
  126. void wait_and_dispatch() {
  127. for (;;) {
  128. auto msg = _queue->wait_and_pop();
  129. if (dispatch(msg)) {
  130. break;
  131. }
  132. }
  133. }
  134. MessageQueue *_queue;
  135. PreviousDispatcher *_previous;
  136. Func _function;
  137. bool _chained;
  138. };
  139. class Dispatcher {
  140. template<typename Dispatcher, typename Msg, typename Func>
  141. friend
  142. class TemplateDispatcher;
  143. public:
  144. Dispatcher(Dispatcher &&other)
  145. : _queue(other._queue),
  146. _chained(other._chained) { other._chained = true; }
  147. explicit Dispatcher(MessageQueue *queue)
  148. : _queue(queue), _chained(false) {}
  149. template<typename Message, typename Func>
  150. TemplateDispatcher<Dispatcher, Message, Func>
  151. handle(Func &&function) {
  152. return TemplateDispatcher<Dispatcher, Message, Func>(
  153. _queue, this, std::forward<Func>(function));
  154. }
  155. ~Dispatcher() noexcept(false) {
  156. if (not _chained) {
  157. wait_and_dispatch();
  158. }
  159. }
  160. private:
  161. Dispatcher(Dispatcher const &) = delete;
  162. Dispatcher &operator=(Dispatcher const &) = delete;
  163. void wait_and_dispatch() {
  164. for (;;) {
  165. auto msg = _queue->wait_and_pop();
  166. dispatch(msg);
  167. }
  168. }
  169. bool dispatch(std::shared_ptr<BaseMessage> const &msg) {
  170. if (dynamic_cast < Message<Close> * >(msg.get())) {
  171. throw Close();
  172. }
  173. return false;
  174. }
  175. MessageQueue *_queue;
  176. bool _chained;
  177. };
  178. class Receiver {
  179. public:
  180. Receiver() : _queue(new MessageQueue) {}
  181. ~Receiver() { delete _queue; }
  182. Sender get_sender() { return Sender(_queue); }
  183. Dispatcher wait() { return Dispatcher(_queue); }
  184. private:
  185. MessageQueue *_queue;
  186. };
  187. }
  188. } // namespace artis common
  189. #endif