Created
October 27, 2011 20:15
-
-
Save mpusz/1320743 to your computer and use it in GitHub Desktop.
Simple TTCN TE implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "utils.h" | |
#include <map> | |
#include <set> | |
#include <vector> | |
#include <tuple> | |
#include <iostream> | |
#include <algorithm> | |
#include <exception> | |
#include <atomic> | |
#include <future> | |
typedef std::tuple<std::string, int> CMessage; | |
class CLogging : CNonCopyable { | |
CConcurrent<std::ostream &> _logger; | |
void Write(const std::string &msg) const | |
{ | |
_logger([=](std::ostream & out){ out << msg << std::endl; }); | |
} | |
public: | |
CLogging() : _logger{std::cout} { } | |
void LogCreated(const std::string &comp) const | |
{ Write("Component '" + comp + "' created"); } | |
void LogConnected(const std::string &comp1, const std::string &comp2) const | |
{ Write("Components '" + comp1 + "' and '" + comp2 + "' connected"); } | |
void LogStarting(const std::string &comp) const | |
{ Write("Starting component '" + comp + "'"); } | |
void LogSending(const std::string &comp, const CMessage &m) const | |
{ Write("Component '" + std::get < 0 > (m) + "' is sending '" + std::to_string(std::get < 1 > (m)) + "' to '" + comp + "'"); } | |
void LogReceived(const std::string &comp, const CMessage &m) const | |
{ Write("'" + std::to_string(std::get < 1 > (m)) + "' received by Component '" + comp + "'"); } | |
template<typename Rep, typename Period> | |
void LogTimeout(const std::string &comp, const std::chrono::duration<Rep, Period> &duration) const | |
{ Write("'" + std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()) + "ms' expired while waiting for data in '" + comp + "'"); } | |
void LogStopped(const std::string &comp) const | |
{ Write("Component '" + comp + "' stopped"); } | |
void LogKilled(const std::string &comp) const | |
{ Write("Component '" + comp + "' killed"); } | |
void LogVerdict(const std::string &comp, bool verdict) const | |
{ Write("Component '" + comp + "' verdict '" + std::string(verdict ? "PASS" : "FAIL") + "'"); } | |
void LogOther(const std::string &str) const | |
{ Write(str); } | |
}; | |
class CComponent; | |
class CBehaviour; | |
class CComponentHandler : CNonCopyable { | |
typedef std::map<std::string, std::shared_ptr<CComponent>> CComponentMap; | |
const CLogging &_log; | |
CMonitor<CComponentMap> _componentMap; | |
std::shared_ptr<CComponent> Component(const std::string &name) const | |
{ | |
try { | |
return _componentMap([&](CComponentMap &map) { return map.at(name); }); | |
} | |
catch(const std::out_of_range &) { | |
throw std::invalid_argument("Component with name '" + name + "' not found"); | |
} | |
} | |
public: | |
CComponentHandler(const CLogging &log): _log(log) {} | |
void Create(const std::string &name); | |
void Connect(const std::string &name1, const std::string &name2); | |
void Start(const std::string &name, const CBehaviour &behaviour); | |
void Send(const std::string &name, const CMessage &m) const; | |
void Done(const std::string &name) const; | |
bool Verdict(const std::string &name) const; | |
}; | |
class CBehaviour { | |
CComponentHandler &_ch; | |
protected: | |
CComponentHandler &CH() const { return _ch; } | |
public: | |
CBehaviour(CComponentHandler &ch): _ch(ch) {} | |
virtual void Run(CComponent &comp) const = 0; | |
}; | |
class EThreadStopped : public std::exception {}; | |
class EThreadKilled : public std::exception {}; | |
class CEventObject; | |
typedef std::tuple<unsigned, CEventObject *> CEvent; | |
typedef std::vector<CEvent> CEventList; | |
class CEventQueue { | |
static const std::chrono::milliseconds RESOLUTION; | |
typedef std::queue<CEvent> CQueue; | |
const std::atomic_bool &_kill; | |
CQueue _queue; | |
std::mutex _mutex; | |
std::condition_variable _newItemReady; | |
public: | |
CEventQueue(const std::atomic_bool &killed): _kill(killed) {} | |
std::mutex &Mutex() { return _mutex; } | |
void PushLocked(CEvent event) | |
{ | |
bool wasEmpty = _queue.empty(); | |
_queue.push(event); | |
if(wasEmpty) | |
_newItemReady.notify_one(); | |
} | |
CEvent PopWait() | |
{ | |
std::unique_lock<std::mutex> lock(_mutex); | |
if(_queue.empty()) { | |
while(true) { | |
if(_newItemReady.wait_for(lock, RESOLUTION, | |
[&_queue, &_kill]{ return _queue.size() || _kill;})) | |
break; | |
} | |
} | |
if(_queue.size()) { | |
CEvent event = _queue.front(); | |
_queue.pop(); | |
return event; | |
} | |
throw EThreadKilled(); | |
} | |
}; | |
const std::chrono::milliseconds CEventQueue::RESOLUTION(100); | |
class CPort; | |
class CEventObject { | |
CEventQueue &_eventQueue; | |
CEventList &_stuckEvents; | |
public: | |
CEventObject(CEventQueue &eventQueue, CEventList &stuckEvents): | |
_eventQueue(eventQueue), _stuckEvents(stuckEvents) {} | |
virtual ~CEventObject() | |
{ | |
// make sure that objects is not on stuck events list anymore | |
std::remove_if(begin(_stuckEvents), end(_stuckEvents), | |
[this](CEvent &event){ return std::get<1>(event) == this; }); | |
} | |
CEventQueue &EventQueue() const { return _eventQueue; } | |
CEventList &StuckEvents() const { return _stuckEvents; } | |
virtual void StuckEventListUpdate(unsigned id) = 0; | |
virtual bool Receive(const CPort *port = nullptr, | |
const CMessage *templ = nullptr, | |
const std::string &from = "", | |
std::function<void(const CMessage &)> &&value = nullptr, | |
std::string *sender = nullptr) | |
{ | |
return false; | |
} | |
}; | |
namespace std { | |
namespace chrono { | |
typedef monotonic_clock steady_clock; | |
} | |
} | |
class CTimer : public CEventObject { | |
typedef std::chrono::steady_clock CClock; | |
const CClock::duration _duration; | |
CClock::time_point _start; | |
public: | |
template<typename Rep, typename Period> | |
explicit CTimer(std::chrono::duration<Rep, Period> duration, CEventQueue &eventQueue, CEventList &stuckEvents): | |
CEventObject{eventQueue, stuckEvents}, _duration{std::chrono::duration_cast<CClock::duration>(duration)} {} | |
void Start() { _start = CClock::now(); } | |
bool Timeout() const { return CClock::now() - _start > _duration; } | |
virtual void StuckEventListUpdate(unsigned id) {} | |
}; | |
class CPort : public CEventObject { | |
public: | |
typedef std::vector<std::string> CPeers; | |
private: | |
typedef std::queue<std::tuple<unsigned, CMessage>> CMessageQueue; | |
const std::string _name; | |
const CComponentHandler &_ch; | |
CPeers _peers; | |
CMessageQueue _inputQueue; | |
std::mutex _inputQueueMutex; | |
unsigned _nextId; | |
public: | |
CPort(const std::string &name, const CComponentHandler &ch, CEventQueue &eventQueue, CEventList &stuckEvents): | |
CEventObject{eventQueue, stuckEvents}, _name{name}, _ch(ch), _nextId{0} {} | |
const std::string &Name() const { return _name; } | |
const CPeers &Peers() const { return _peers; } | |
void Connect(const std::string &name) { _peers.push_back(name); } | |
void Enqueue(const CMessage &m) | |
{ | |
std::unique_lock<std::mutex> inputQueueLock(_inputQueueMutex, std::defer_lock); | |
std::unique_lock<std::mutex> eventQueueLock(EventQueue().Mutex(), std::defer_lock); | |
std::lock(inputQueueLock, eventQueueLock); | |
unsigned id = _nextId++; | |
_inputQueue.push(std::make_tuple(id, m)); | |
EventQueue().PushLocked(CEvent(id, this)); | |
} | |
void Send(const std::string &name, const CMessage &m) { _ch.Send(name, m); } | |
bool Receive(const CPort *port = nullptr, | |
const CMessage *templ = nullptr, | |
const std::string &from = "", | |
std::function<void(const CMessage &)> &&value = nullptr, | |
std::string *sender = nullptr) | |
{ | |
if(port && port != this) | |
return false; | |
std::lock_guard<std::mutex> lock(_inputQueueMutex); | |
if(_inputQueue.size()) { | |
const CMessage &msg = std::get<1>(_inputQueue.front()); | |
if((!templ || msg == *templ) && (from == "" || std::get<0>(msg) == from)) { | |
if(value) | |
value(msg); | |
if(sender) | |
*sender = std::get<0>(msg); | |
_inputQueue.pop(); | |
return true; | |
} | |
} | |
return false; | |
} | |
virtual void StuckEventListUpdate(unsigned id) | |
{ | |
CEvent event{id, this}; | |
CEventList &stuckEvents = StuckEvents(); | |
auto it = std::find_if(begin(stuckEvents), end(stuckEvents), | |
[&event](CEvent &e) { return e == event;}); | |
bool stucked = false; | |
{ | |
std::lock_guard<std::mutex> lock{_inputQueueMutex}; | |
stucked = (_inputQueue.size() && std::get<0>(_inputQueue.front()) == id); | |
} | |
if(stucked) { | |
if(it == stuckEvents.end()) | |
stuckEvents.push_back(event); | |
} | |
else { | |
if(it != stuckEvents.end()) | |
stuckEvents.erase(it); | |
} | |
} | |
}; | |
class CComponent { | |
public: | |
enum class TStatus { | |
RUNNING, | |
STOPPED, | |
KILLED | |
}; | |
private: | |
const std::string _name; | |
const CLogging &_log; | |
std::atomic_bool _kill; | |
CEventQueue _eventQueue; | |
CEventList _stuckEvents; | |
CPort _port; | |
mutable std::mutex _mutex; | |
TStatus _status; | |
bool _verdict; | |
std::future<void> _result; | |
void RunThread(const CBehaviour &behaviour) | |
{ | |
_log.LogStarting(_name); | |
Status(TStatus::RUNNING); | |
try { | |
behaviour.Run(*this); | |
Status(TStatus::STOPPED); | |
_log.LogStopped(_name); | |
} | |
catch(EThreadStopped &) { | |
Status(TStatus::STOPPED); | |
_log.LogStopped(_name); | |
} | |
catch(EThreadKilled &) { | |
Status(TStatus::KILLED); | |
_log.LogKilled(_name); | |
} | |
} | |
void Status(TStatus status) | |
{ | |
std::lock_guard<std::mutex> lock(_mutex); | |
_status = status; | |
} | |
public: | |
CComponent(const std::string &name, const CComponentHandler &ch, const CLogging &log) : | |
_name{name}, _log(log), _kill{false}, | |
_eventQueue{_kill}, _port{name + ".0", ch, _eventQueue, _stuckEvents}, | |
_status{TStatus::STOPPED}, _verdict{false} | |
{ | |
} | |
~CComponent() | |
{ | |
if(Status() == TStatus::RUNNING) { | |
_kill.store(true, std::memory_order_relaxed); | |
_result.wait(); | |
} | |
} | |
const std::string &Name() const { return _name; } | |
CEventQueue &EventQueue() { return _eventQueue; } | |
CEventList &StuckEvents() { return _stuckEvents; } | |
TStatus Status() const | |
{ | |
std::lock_guard<std::mutex> lock(_mutex); | |
return _status; | |
} | |
CPort &Port() { return _port; } | |
void Start(const CBehaviour &behaviour) | |
{ | |
_result = std::async(std::launch::async, [&]{ CComponent::RunThread(behaviour); }); | |
} | |
void Verdict(bool verdict) | |
{ | |
_verdict = verdict; | |
_log.LogVerdict(_name, verdict); | |
} | |
bool Verdict() { return _verdict; } | |
void Done() { _result.wait(); } | |
}; | |
void CComponentHandler::Create(const std::string &name) | |
{ | |
auto comp = std::make_shared<CComponent>(name, *this, _log); | |
_componentMap([&](CComponentMap &map){ map[name] = std::move(comp); }); | |
_log.LogCreated(name); | |
} | |
void CComponentHandler::Connect(const std::string &name1, const std::string &name2) | |
{ | |
auto comp1 = Component(name1); | |
auto comp2 = Component(name2); | |
comp1->Port().Connect(name2); | |
comp2->Port().Connect(name1); | |
_log.LogConnected(name1, name2); | |
} | |
void CComponentHandler::Start(const std::string &name, const CBehaviour &behaviour) | |
{ | |
Component(name)->Start(behaviour); | |
} | |
void CComponentHandler::Send(const std::string &name, const CMessage &m) const | |
{ | |
_log.LogSending(name, m); | |
Component(name)->Port().Enqueue(m); | |
} | |
void CComponentHandler::Done(const std::string &name) const | |
{ | |
Component(name)->Done(); | |
} | |
bool CComponentHandler::Verdict(const std::string &name) const | |
{ | |
return Component(name)->Verdict(); | |
} | |
class CAlt { | |
typedef std::tuple<std::function<bool(CEventObject &)>, std::function<void()> > CBranchData; | |
typedef std::deque<CBranchData> CBranches; | |
CEventQueue &_eventQueue; | |
CEventList &_stuckEvents; | |
CBranches _branches; | |
bool _repeat; | |
public: | |
CAlt(CEventQueue &eventQueue, CEventList &stuckEvents): | |
_eventQueue(eventQueue), _stuckEvents(stuckEvents), _repeat{false} | |
{ | |
} | |
void Add(std::function<bool(CEventObject &)> &&predicate, std::function<void()> &&action) | |
{ | |
_branches.push_back(std::make_tuple(predicate, action)); | |
} | |
void Repeat() | |
{ | |
_repeat = true; | |
} | |
// void Run() | |
void Run(CComponent &comp) | |
{ | |
bool found; | |
do { | |
typedef std::set<CEventObject *> EventObjectSet; | |
EventObjectSet checkedObjects; | |
CEvent event; | |
CBranches::iterator it; | |
bool newEvent = false; | |
found = false; | |
// process old events | |
for(auto e : _stuckEvents) { | |
event = e; | |
checkedObjects.insert(std::get<1>(event)); | |
it = std::find_if(begin(_branches), end(_branches), | |
[&event](const CBranchData &d) | |
{ return std::get<0>(d)(*std::get<1>(event)); }); | |
if(it != _branches.end()) { | |
found = true; | |
break; | |
} | |
} | |
if(!found) { | |
// process new event | |
event = _eventQueue.PopWait(); | |
newEvent = true; | |
// check if not a stucked object | |
if(checkedObjects.find(std::get<1>(event)) == checkedObjects.end()) { | |
it = std::find_if(begin(_branches), end(_branches), | |
[&event](const CBranchData &d) | |
{ return std::get<0>(d)(*std::get<1>(event)); }); | |
if(it != _branches.end()) | |
found = true; | |
} | |
} | |
if(found) { | |
// run the action | |
_repeat = false; | |
std::get<1>(*it)(); | |
} | |
if(newEvent) { | |
// if that alternate does not wait on that object or an event was not removed from | |
// object internal events queue than add that event to the stuck events list | |
std::get<1>(event)->StuckEventListUpdate(std::get<0>(event)); | |
} | |
} | |
while(_repeat || !found); | |
} | |
}; | |
class CBehaviourServer : public CBehaviour { | |
public: | |
CBehaviourServer(CComponentHandler &ch): CBehaviour{ch} { } | |
void Run(CComponent &comp) const | |
{ | |
CAlt alt{comp.EventQueue(), comp.StuckEvents()}; | |
CMessage msg; | |
alt.Add([&](CEventObject &obj) | |
{ return obj.Receive(nullptr, nullptr, "", [&](const CMessage &m) { msg = m; }, nullptr); }, | |
[&]{ | |
CH().Send(std::get<0>(msg), CMessage(comp.Name(), std::get<1>(msg) * std::get<1>(msg))); | |
alt.Repeat(); | |
}); | |
alt.Run(comp); | |
} | |
}; | |
class CBehaviourClient : public CBehaviour { | |
static const int RETRY_NUM = 10; | |
public: | |
CBehaviourClient(CComponentHandler &ch): CBehaviour(ch) {} | |
void Run(CComponent &comp) const | |
{ | |
const std::chrono::milliseconds MSG_TIMEOUT(500); | |
const int RETRY_NUM = 5; | |
CPort &port = comp.Port(); | |
CTimer t(MSG_TIMEOUT, comp.EventQueue(), comp.StuckEvents()); | |
int count = 0; | |
port.Send(port.Peers().front(), CMessage(comp.Name(), count)); | |
t.Start(); | |
CAlt alt(comp.EventQueue(), comp.StuckEvents()); | |
CMessage m(port.Peers().front(), count * count); | |
alt.Add([&](CEventObject &obj) | |
{ return obj.Receive(&port, &m); }, | |
[&]{ | |
comp.Verdict(true); | |
}); | |
alt.Add([&](CEventObject &obj) | |
{ return obj.Receive(&port); }, | |
[&]{ | |
comp.Verdict(false); | |
}); | |
alt.Add([&](CEventObject &obj) | |
{ return count < RETRY_NUM && t.Timeout(); }, | |
[&]{ | |
count = count + 1; | |
port.Send(port.Peers().front(), CMessage(comp.Name(), count)); | |
alt.Repeat(); | |
}); | |
alt.Add([&](CEventObject &obj) | |
{ return count == RETRY_NUM && t.Timeout(); }, | |
[&]{ | |
comp.Verdict(false); | |
}); | |
alt.Run(comp); | |
} | |
}; | |
class CBehaviourTest : public CBehaviour { | |
constexpr static unsigned CLIENTS_COUNT = 5; | |
public: | |
CBehaviourTest(CComponentHandler &ch): CBehaviour(ch) {} | |
void Run(CComponent &comp) const | |
{ | |
// create clients | |
for(unsigned i=0; i<CLIENTS_COUNT; i++) | |
CH().Create("client_" + std::to_string(i)); | |
// connect clients | |
for(unsigned i=0; i<CLIENTS_COUNT; i++) | |
CH().Connect("client_" + std::to_string(i), comp.Port().Peers().front()); | |
// start clients behavior | |
CBehaviourClient clientBehaviour(CH()); | |
for(unsigned i=0; i<CLIENTS_COUNT; i++) | |
CH().Start("client_" + std::to_string(i), clientBehaviour); | |
// wait for clients finish | |
for(unsigned i=0; i<CLIENTS_COUNT; i++) | |
CH().Done("client_" + std::to_string(i)); | |
// obtain clients verdict | |
bool verdict = true; | |
for(unsigned i=0; i<CLIENTS_COUNT; i++) | |
verdict &= CH().Verdict("client_" + std::to_string(i)); | |
comp.Verdict(verdict); | |
} | |
}; | |
int main() | |
{ | |
CLogging log; | |
log.LogOther("Creating components"); | |
CComponentHandler ch(log); | |
ch.Create("server"); | |
ch.Create("test"); | |
ch.Connect("server", "test"); | |
log.LogOther("Running components"); | |
CBehaviourServer serverBehavior(ch); | |
ch.Start("server", serverBehavior); | |
CBehaviourTest testBehavior(ch); | |
ch.Start("test", testBehavior); | |
ch.Done("test"); | |
std::string verdict(ch.Verdict("test") ? "PASS" : "FAIL"); | |
log.LogOther("Verdict: " + verdict); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef UTILS_H | |
#define UTILS_H | |
#include <thread> | |
#include <future> | |
#include <queue> | |
class CNonCopyable { | |
public: | |
CNonCopyable() = default; | |
CNonCopyable(const CNonCopyable &) = delete; | |
CNonCopyable &operator=(const CNonCopyable &) = delete; | |
}; | |
template<class T> | |
class CMonitor { | |
mutable T _t; | |
mutable std::mutex _m; | |
public: | |
CMonitor(T t=T{}): _t(t) {} | |
template<typename F> | |
auto operator()(F f) const -> decltype(f(_t)) | |
{ | |
std::lock_guard<std::mutex> lck{_m}; | |
return f(_t); | |
} | |
}; | |
template<typename T> | |
class CWaitQueue : CNonCopyable { | |
std::queue<T> _queue; | |
std::condition_variable _newItemReady; | |
std::mutex _mutex; | |
public: | |
CWaitQueue() = default; | |
void Push(T &&msg) | |
{ | |
std::lock_guard<std::mutex> lock{_mutex}; | |
bool wasEmpty = _queue.empty(); | |
_queue.emplace(std::move(msg)); | |
if(wasEmpty) | |
_newItemReady.notify_one(); | |
} | |
T PopWait() | |
{ | |
std::unique_lock<std::mutex> lock{_mutex}; | |
_newItemReady.wait(lock, [&]{ return !_queue.empty(); }); | |
auto msg = std::move(_queue.front()); | |
_queue.pop(); | |
return msg; | |
} | |
}; | |
namespace details { | |
template<typename FUT, typename F, typename T> | |
void SetValue(std::promise<FUT> &p, F &f, T &t) | |
{ | |
p.set_value(f(t)); | |
} | |
template<typename F, typename T> | |
void SetValue(std::promise<void> &p, F &f, T &t) | |
{ | |
f(t); | |
p.set_value(); | |
} | |
} | |
template<class T> | |
class CConcurrent { | |
mutable T _t; | |
mutable CWaitQueue<std::function<void()>> _queue; | |
bool _done; | |
std::thread _thread; | |
public: | |
CConcurrent(T t = T{}): | |
_t(t), | |
_done{false}, | |
_thread{[=]{ while(!_done) _queue.PopWait()(); }} | |
{ | |
} | |
~CConcurrent() | |
{ | |
_queue.Push([=]{ _done = true; }); | |
_thread.join(); | |
} | |
template<typename F> | |
auto operator()(F f) const -> std::future<decltype(f(_t))> | |
{ | |
auto p = std::make_shared<std::promise<decltype(f(_t))>>(); | |
auto ret = p->get_future(); | |
_queue.Push([=]{ | |
try { | |
details::SetValue(*p, f, _t); | |
} | |
catch(...) { | |
p->set_exception(std::current_exception()); | |
} | |
}); | |
return ret; | |
} | |
}; | |
#endif // UTILS_H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment