Browse Source

Multiple fixes: multithreading pdevs, output and observations

Eric Ramat 1 year ago
parent
commit
90b1d817fb

+ 4 - 0
src/artis-star/common/ExternalEvent.hpp

@@ -54,6 +54,10 @@ namespace artis {
                     :
                     _port_index(-1), _model(nullptr), _data(data) { }
 
+            ExternalEvent(int port_index)
+                    :
+                    _port_index(port_index), _model(nullptr), _data(Value()) { }
+
             ExternalEvent(int port_index, const Value& data)
                     :
                     _port_index(port_index), _model(nullptr), _data(data) { }

+ 4 - 1
src/artis-star/common/RootCoordinator.hpp

@@ -92,9 +92,12 @@ namespace artis {
                     _tn = _root.start(_tn);
                 }
                 while (_tn <= _t_max) {
+                    typename Time::type new_tn;
+
                     _root.output(_tn);
-                    _tn = _root.transition(_tn);
+                    new_tn = _root.transition(_tn);
                     _observer.observe(_tn);
+                    _tn = new_tn;
                 }
             }
 

+ 9 - 4
src/artis-star/common/observer/Iterator.hpp

@@ -59,10 +59,12 @@ namespace artis {
                     :Iterator<Time>(view), _step(step), _time(start)
             {
                 _last_value = &Iterator<Time>::operator*();
-                while (Iterator<Time>::operator*().first < start
+                while (Iterator<Time>::operator*().first <= start
                         and Iterator<Time>::has_next()) {
                     Iterator<Time>::operator++();
-                    _last_value = &Iterator<Time>::operator*();
+                    if (Iterator<Time>::operator*().first == start) {
+                        _last_value = &Iterator<Time>::operator*();
+                    }
                 }
             }
 
@@ -71,9 +73,12 @@ namespace artis {
             void operator++() override
             {
                 _time += _step;
-                while (Iterator<Time>::operator*().first < _time and Iterator<Time>::has_next()) {
+                _last_value = &Iterator<Time>::operator*();
+                while (Iterator<Time>::operator*().first <= _time and Iterator<Time>::has_next()) {
                     Iterator<Time>::operator++();
-                    _last_value = &Iterator<Time>::operator*();
+                    if (Iterator<Time>::operator*().first == _time) {
+                        _last_value = &Iterator<Time>::operator*();
+                    }
                 }
             }
 

+ 29 - 23
src/artis-star/common/observer/Output.hpp

@@ -23,11 +23,12 @@
 #ifndef ARTIS_COMMON_OBSERVER_OUTPUT_HPP
 #define ARTIS_COMMON_OBSERVER_OUTPUT_HPP
 
+#include <artis-star/common/observer/Iterator.hpp>
 #include <artis-star/common/observer/Observer.hpp>
 #include <artis-star/common/observer/View.hpp>
-#include <artis-star/utils/DateTime.hpp>
 
 #include <boost/format.hpp>
+#include <fstream>
 
 namespace artis {
     namespace observer {
@@ -40,46 +41,51 @@ namespace artis {
 
             virtual ~Output() { }
 
-            void operator()() const
+            void operator()(double begin, double end, double step) const
             {
                 const typename Observer<Time>::Views& views = _observer.views();
 
                 for (typename Observer<Time>::Views::const_iterator it =
                         views.begin(); it != views.end(); ++it) {
                     std::ofstream o((boost::format("%1%.csv") % it->first).str());
-                    typename View<Time>::Values values = it->second->values();
-                    double begin = it->second->begin();
-                    double end = it->second->end();
+                    const typename View<Time>::SelectorValues& values = it->second->values();
+                    std::vector<artis::observer::DiscreteTimeIterator<artis::common::DoubleTime>> its;
 
                     o.precision(10);
                     // write header
                     o << "time";
-                    for (typename View<Time>::Values::const_iterator
-                                 itv = values.begin(); itv != values.end(); ++itv) {
-                        o << ";" << itv->first;
+                    for (typename View<Time>::SelectorValues::const_iterator itv = values.begin();
+                         itv != values.end(); ++itv) {
+                        const typename View<Time>::VariableValues& vv = itv->second;
+
+                        for (typename View<Time>::VariableValues::const_iterator itvv = vv.begin();
+                             itvv != vv.end(); ++itvv) {
+                            o << ";" << itvv->first;
+                            its.push_back(
+                                    artis::observer::DiscreteTimeIterator<artis::common::DoubleTime>(
+                                            itvv->second, begin, step));
+                        }
                     }
                     o << std::endl;
 
                     // write values
-                    for (double t = begin; t <= end; ++t) {
-                        o << utils::DateTime::toJulianDay(t);
-                        // o << t;
-                        for (typename View<Time>::Values::const_iterator itv =
-                                values.begin(); itv != values.end(); ++itv) {
-                            typename View<Time>::Value::const_iterator itp =
-                                    itv->second.begin();
-
-                            while (itp != itv->second.end() and itp->first < t) {
-                                ++itp;
-                            }
-                            o << ";";
-                            if (itp != itv->second.end()) {
-                                o << itp->second;
+                    bool more = true;
+                    double t = begin;
+
+                    while (more and t <= end) {
+                        more = false;
+                        o << t;
+                        for (auto& dtt: its) {
+                            if (dtt.has_next()) {
+                                o << ";" << (*dtt).second.to_string();
+                                ++dtt;
+                                more = true;
                             } else {
-                                o << "NA";
+                                o << ";";
                             }
                         }
                         o << std::endl;
+                        t += step;
                     }
                 }
             }

+ 10 - 5
src/artis-star/common/utils/Multithreading.hpp

@@ -51,6 +51,8 @@ namespace artis {
 
         class MessageQueue {
         public:
+            MessageQueue() = default;
+
             template<typename T>
             void push(T const& msg)
             {
@@ -116,7 +118,8 @@ namespace artis {
                     Func&& function)
                     :
                     _queue(queue), _previous(previous),
-                    _function(std::forward<Func>(function)), _chained(false) { previous->_chained = true; }
+                    _function(std::forward<Func>(function)),
+                    _chained(false) { previous->_chained = true; }
 
             bool dispatch(std::shared_ptr<BaseMessage> const& msg)
             {
@@ -227,14 +230,16 @@ namespace artis {
 
         class Receiver {
         public:
-            Receiver() { }
+            Receiver() : _queue(new MessageQueue) { }
+
+            ~Receiver() { delete _queue; }
 
-            operator Sender() { return Sender(&_queue); }
+            Sender get_sender() { return Sender(_queue); }
 
-            Dispatcher wait() { return Dispatcher(&_queue); }
+            Dispatcher wait() { return Dispatcher(_queue); }
 
         private:
-            MessageQueue _queue;
+            MessageQueue* _queue;
         };
 
     }

+ 8 - 10
src/artis-star/kernel/pdevs/Coordinator.hpp

@@ -152,7 +152,7 @@ namespace artis {
                 common::Trace<Time>::trace()
                         << common::TraceElement<Time>(type::get_name(), t,
                                 common::FormalismType::PDEVS,
-                                common::FunctionType::S_MESSAGE,
+                                common::FunctionType::OUTPUT,
                                 common::LevelType::FORMALISM)
                         << ": BEFORE => " << "tl = " << type::_tl << " ; tn = "
                         << type::_tn << " ; scheduler = " << _event_table.to_string();
@@ -167,7 +167,7 @@ namespace artis {
                 common::Trace<Time>::trace()
                         << common::TraceElement<Time>(type::get_name(), t,
                                 common::FormalismType::PDEVS,
-                                common::FunctionType::S_MESSAGE,
+                                common::FunctionType::OUTPUT,
                                 common::LevelType::FORMALISM)
                         << ": IMM = " << IMM.to_string();
                 common::Trace<Time>::trace().flush();
@@ -181,7 +181,7 @@ namespace artis {
                 common::Trace<Time>::trace()
                         << common::TraceElement<Time>(type::get_name(), t,
                                 common::FormalismType::PDEVS,
-                                common::FunctionType::S_MESSAGE,
+                                common::FunctionType::OUTPUT,
                                 common::LevelType::FORMALISM)
                         << ": AFTER => " << "tl = " << type::_tl << " ; tn = "
                         << type::_tn << " ; scheduler = " << _event_table.to_string();
@@ -235,12 +235,11 @@ namespace artis {
                     _event_table.put(model->transition(t), model);
                 }
                 for (auto& model : IMM) {
-                    if (std::find(receivers.begin(), receivers.end(),
-                            model) == receivers.end()) {
+                    if (std::find(receivers.begin(), receivers.end(), model) == receivers.end()) {
                         _event_table.put(model->transition(t), model);
                     }
                 }
-                update_event_table(t);
+//                update_event_table(t);
                 type::_tl = t;
                 type::_tn = _event_table.get_current_time();
                 type::clear_bag();
@@ -259,8 +258,7 @@ namespace artis {
                 return type::_tn;
             }
 
-            void post_event(const typename Time::type& t,
-                    const common::ExternalEvent<Time>& event)
+            void post_event(const typename Time::type& t, const common::ExternalEvent<Time>& event)
             {
 
 #ifdef WITH_TRACE
@@ -275,7 +273,7 @@ namespace artis {
 
                 type::add_event(event);
                 _graph_manager.post_event(t, event);
-                update_event_table(t);
+//                update_event_table(t);
                 type::_tn = _event_table.get_current_time();
 
 #ifdef WITH_TRACE
@@ -307,7 +305,7 @@ namespace artis {
 #endif
 
                 _graph_manager.dispatch_events(bag, t);
-                update_event_table(t);
+//                update_event_table(t);
                 type::_tn = _event_table.get_current_time();
 
 #ifdef WITH_TRACE

+ 5 - 4
src/artis-star/kernel/pdevs/Simulator.hpp

@@ -154,8 +154,10 @@ namespace artis {
                 return type::_tn;
             }
 
-            common::Value observe(const typename Time::type& t,
-                    unsigned int index) const { return _dynamics.observe(t, index); }
+            common::Value observe(const typename Time::type& t, unsigned int index) const
+            {
+                return _dynamics.observe(t, index);
+            }
 
             virtual std::string observable_name(unsigned int observable_index) const
             {
@@ -204,8 +206,7 @@ namespace artis {
 
             }
 
-            void post_event(const typename Time::type& t,
-                    const common::ExternalEvent<Time>& event)
+            void post_event(const typename Time::type& t, const common::ExternalEvent<Time>& event)
             {
 
 #ifndef WITH_TRACE

+ 2 - 1
src/artis-star/kernel/pdevs/multithreading/Coordinator.hpp

@@ -109,7 +109,7 @@ namespace artis {
 
                 void done() { get_sender().send(artis::common::Close()); }
 
-                artis::common::Sender get_sender() { return _incoming; }
+                artis::common::Sender get_sender() { return _incoming.get_sender(); }
 
                 void set_sender(common::Sender sender) { _sender = sender; }
 
@@ -211,6 +211,7 @@ namespace artis {
                     if (_received > 0) {
                         _received_mutex.lock();
                         type::_graph_manager.transition(receivers, t);
+                        type::_graph_manager.transition(IMM, t);
 
                         std::lock_guard<std::mutex> lock(_received_mutex);
                     }