/** * @file common/utils/Multithreading.hpp * @author The ARTIS Development Team * See the AUTHORS or Authors.txt file */ /* * ARTIS - the multimodeling and simulation environment * This file is a part of the ARTIS environment * * Copyright (C) 2013-2019 ULCO http://www.univ-littoral.fr * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #ifndef COMMON_UTILS_MULTITHREADING #define COMMON_UTILS_MULTITHREADING 1 #include #include #include namespace artis { namespace common { struct BaseMessage { virtual ~BaseMessage() { } }; template struct Message : BaseMessage { explicit Message(Msg const& content) :_content(content) { } Msg _content; }; class Close { }; class MessageQueue { public: MessageQueue() = default; template void push(T const& msg) { std::lock_guard lock(_mutex); _queue.push(std::make_shared >(msg)); _condition.notify_all(); } std::shared_ptr wait_and_pop() { std::unique_lock lock(_mutex); _condition.wait(lock, [&] { return not _queue.empty(); }); auto res = _queue.front(); _queue.pop(); return res; } private: std::mutex _mutex; std::condition_variable _condition; std::queue > _queue; }; class Sender { public: Sender() :_queue(0) { } explicit Sender(MessageQueue* queue) :_queue(queue) { } template void send(Message const& msg) { if (_queue) { _queue->push(msg); } } private: MessageQueue* _queue; }; template class TemplateDispatcher { template friend class TemplateDispatcher; public: TemplateDispatcher(TemplateDispatcher&& other) :_queue(other._queue), _previous(other._previous), _function( std::move( other._function)), _chained(other._chained) { other._chained = true; } TemplateDispatcher(MessageQueue* queue, PreviousDispatcher* previous, Func&& function) : _queue(queue), _previous(previous), _function(std::forward(function)), _chained(false) { previous->_chained = true; } bool dispatch(std::shared_ptr const& msg) { Message* message = dynamic_cast < Message* >(msg.get()); if (message) { _function(message->_content); return true; } else { return _previous->dispatch(msg); } } template TemplateDispatcher, OtherMsg, OtherFunc> handle(OtherFunc&& of) { return TemplateDispatcher, OtherMsg, OtherFunc>( _queue, this, std::forward(of)); } ~TemplateDispatcher() noexcept(false) { if (not _chained) { wait_and_dispatch(); } } private: TemplateDispatcher(TemplateDispatcher const&) = delete; TemplateDispatcher& operator=(TemplateDispatcher const&) = delete; void wait_and_dispatch() { for (;;) { auto msg = _queue->wait_and_pop(); if (dispatch(msg)) { break; } } } MessageQueue* _queue; PreviousDispatcher* _previous; Func _function; bool _chained; }; class Dispatcher { template friend class TemplateDispatcher; public: Dispatcher(Dispatcher&& other) :_queue(other._queue), _chained(other._chained) { other._chained = true; } explicit Dispatcher(MessageQueue* queue) :_queue(queue), _chained(false) { } template TemplateDispatcher handle(Func&& function) { return TemplateDispatcher( _queue, this, std::forward(function)); } ~Dispatcher() noexcept(false) { if (not _chained) { wait_and_dispatch(); } } private: Dispatcher(Dispatcher const&) = delete; Dispatcher& operator=(Dispatcher const&) = delete; void wait_and_dispatch() { for (;;) { auto msg = _queue->wait_and_pop(); dispatch(msg); } } bool dispatch(std::shared_ptr const& msg) { if (dynamic_cast < Message* >(msg.get())) { throw Close(); } return false; } MessageQueue* _queue; bool _chained; }; class Receiver { public: Receiver() : _queue(new MessageQueue) { } ~Receiver() { delete _queue; } Sender get_sender() { return Sender(_queue); } Dispatcher wait() { return Dispatcher(_queue); } private: MessageQueue* _queue; }; } } // namespace artis common #endif