Skip to content

Instantly share code, notes, and snippets.

@nikkon-dev
Created November 27, 2019 00:10
Show Gist options
  • Save nikkon-dev/309a6750c6aee6437531cab2ae338466 to your computer and use it in GitHub Desktop.
Save nikkon-dev/309a6750c6aee6437531cab2ae338466 to your computer and use it in GitHub Desktop.
#include <semaphore.h>
#include <atomic>
#include <cerrno>
#include <chrono>
#include <cstdio>
#include <ctime>
#include <memory>
#include <mutex>
#include <system_error>
#include <thread>
#include <vector>
static std::mutex stdoutmtx;
template <class... TArgs>
void print(char const *fmt, TArgs &&... args) {
std::lock_guard<std::mutex> lck(stdoutmtx);
printf(fmt, std::forward<TArgs>(args)...);
}
template <class... TArgs>
void fprint(FILE *where, char const *fmt, TArgs &&... args) {
std::lock_guard<std::mutex> lck(stdoutmtx);
fprintf(where, fmt, std::forward<TArgs>(args)...);
}
class LocalSemaphore : public std::enable_shared_from_this<LocalSemaphore> {
private:
struct WaitersGuard {
explicit WaitersGuard(std::atomic_uint &val) : m_val(val) { ++m_val; }
~WaitersGuard() { --m_val; }
WaitersGuard(WaitersGuard const &) = delete;
WaitersGuard &operator=(WaitersGuard const &) = delete;
WaitersGuard(WaitersGuard &&) = delete;
WaitersGuard &operator=(WaitersGuard &&) = delete;
private:
std::atomic_uint &m_val;
};
public:
enum class [[nodiscard]] ReleaseResult{Ok, Destroyed, Overflow};
enum class [[nodiscard]] WaitResult{Ok, Destroyed};
enum class [[nodiscard]] TimedWaitResult{Ok, Destroyed, TimedOut};
enum class [[nodiscard]] AsyncWaitResult{Ok, Destroyed, LockNeeded};
LocalSemaphore() : m_sem{} {
fprint(stderr, "LocalSemaphore .ctor\n");
if (0 != sem_init(&m_sem, 0, 0)) {
auto err = errno;
fprint(stderr, "ERROR: LocalSemaphore .ctor %d\n", err);
throw std::system_error(std::error_code(err, std::generic_category()));
}
}
~LocalSemaphore() {
Destroy();
while (m_numOfWaiters != 0) {
std::this_thread::yield();
}
sem_destroy(&m_sem);
}
LocalSemaphore(LocalSemaphore const &) = delete;
LocalSemaphore &operator=(LocalSemaphore const &) = delete;
LocalSemaphore(LocalSemaphore &&) = delete;
LocalSemaphore &operator=(LocalSemaphore &&) = delete;
void Destroy() { m_markForDestroy = true; }
ReleaseResult Release(std::uint32_t number = 1) {
fprint(stderr, "Release #1\n");
for (std::uint32_t i = 0; i < number; ++i) {
int res = sem_post(&m_sem);
if (res != 0) {
auto err = errno;
if (err == EOVERFLOW) {
return ReleaseResult::Overflow;
}
fprint(stderr, "ERROR: Release: %d\n", err);
throw std::system_error(std::error_code(err, std::generic_category()));
}
}
return ReleaseResult::Ok;
}
AsyncWaitResult TryWait() {
WaitersGuard grd(m_numOfWaiters);
if (m_markForDestroy) {
return AsyncWaitResult::Destroyed;
}
auto res = sem_trywait(&m_sem);
if (res == 0) {
return AsyncWaitResult::Ok;
}
auto err = errno;
if (err != EAGAIN) {
fprint(stderr, "ERROR: TryWait %d\n", err);
throw std::system_error(std::error_code(err, std::generic_category()));
}
return AsyncWaitResult::LockNeeded;
}
TimedWaitResult TimedWait(std::chrono::milliseconds timeout) {
WaitersGuard grd(m_numOfWaiters);
if (m_markForDestroy) {
return TimedWaitResult::Destroyed;
}
timespec ts{};
int res = clock_gettime(CLOCK_REALTIME, &ts);
if (res != 0) {
auto err = errno;
fprint(stderr, "ERROR: TimedWait (clock_gettime): %d\n", err);
throw std::system_error(std::error_code(err, std::generic_category()));
}
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(timeout);
timeout -= seconds;
auto nanoseconds =
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout);
ts.tv_sec += seconds.count();
ts.tv_nsec += nanoseconds.count();
if (ts.tv_nsec >= 1E9) {
ts.tv_sec += 1;
ts.tv_nsec -= 1E9;
}
res = sem_timedwait(&m_sem, &ts);
if (res == 0) {
return TimedWaitResult::Ok;
}
auto err = errno;
if (err != ETIMEDOUT) {
fprint(stderr, "ERROR: TimedWait (sem_timedwait): %d\n", err);
throw std::system_error(std::error_code(err, std::generic_category()));
}
if (m_markForDestroy) {
return TimedWaitResult::Destroyed;
}
return TimedWaitResult::TimedOut;
}
WaitResult Wait() {
fprint(stderr, "Wait #1\n");
WaitersGuard grd(m_numOfWaiters);
switch (TryWait()) {
case AsyncWaitResult::Ok:
return WaitResult::Ok;
case AsyncWaitResult::Destroyed:
return WaitResult::Destroyed;
case AsyncWaitResult::LockNeeded:
default:
/*do nothing*/
break;
};
while (true) {
fprint(stderr, "Wait #2: tid: %zu\n",
std::hash<std::thread::id>()(std::this_thread::get_id()));
switch (TimedWait(std::chrono::seconds(1))) {
case TimedWaitResult::Ok:
return WaitResult::Ok;
case TimedWaitResult::Destroyed:
return WaitResult::Destroyed;
case TimedWaitResult::TimedOut:
std::this_thread::yield();
break;
default:
break;
};
}
}
private:
sem_t m_sem;
std::atomic_bool m_markForDestroy = false;
std::atomic_uint m_numOfWaiters = 0;
};
int main() {
std::vector<std::thread> workers;
{
auto sm = std::make_shared<LocalSemaphore>();
fprint(stderr, "Main #1\n");
for (int i = 0; i < 10; ++i) {
workers.emplace_back(
[sm_wptr = std::weak_ptr<LocalSemaphore>(sm), idx = i]() {
fprint(stderr, "Worker #%d started\n", idx);
if (auto sm = sm_wptr.lock()) {
switch (sm->Wait()) {
case LocalSemaphore::WaitResult::Ok:
fprint(stderr, "Worker #%d\n", idx);
break;
case LocalSemaphore::WaitResult::Destroyed:
fprint(stderr, "Worker #%d observed destroyed sem\n", idx);
break;
default:
break;
}
}
});
}
fprint(stderr, "Main #2\n");
std::thread mainThread([sm] {
switch (sm->Release()) {
case LocalSemaphore::ReleaseResult::Ok:
break;
case LocalSemaphore::ReleaseResult::Destroyed:
fprint(stderr, "Main Thread observed destroyed sem\n");
break;
default:
break;
}
});
for (auto &&w : workers) {
w.detach();
}
mainThread.join();
std::this_thread::sleep_for(std::chrono::seconds(3));
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment