Created
December 22, 2023 12:43
-
-
Save JuanDiegoMontoya/1b5825e217f4886fb94807cd129e33c4 to your computer and use it in GitHub Desktop.
test of sync primitive for libcoro: this semaphore can be incremented and decremented by an arbitrary amount (instead of just 1), which makes it useful for rate-limiting resources
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <coro/coro.hpp> | |
namespace test | |
{ | |
class semaphore | |
{ | |
public: | |
enum class acquire_result | |
{ | |
acquired, | |
semaphore_stopped | |
}; | |
static std::string acquire_result_acquired; | |
static std::string acquire_result_semaphore_stopped; | |
static std::string acquire_result_unknown; | |
static auto to_string(acquire_result ar) -> const std::string& | |
{ | |
switch (ar) | |
{ | |
case acquire_result::acquired: return acquire_result_acquired; | |
case acquire_result::semaphore_stopped: return acquire_result_semaphore_stopped; | |
} | |
return acquire_result_unknown; | |
} | |
explicit semaphore(std::ptrdiff_t least_max_value_and_starting_value); | |
explicit semaphore(std::ptrdiff_t least_max_value, std::ptrdiff_t starting_value); | |
~semaphore(); | |
semaphore(const semaphore&) = delete; | |
semaphore(semaphore&&) = delete; | |
auto operator=(const semaphore&) noexcept -> semaphore& = delete; | |
auto operator=(semaphore&&) noexcept -> semaphore& = delete; | |
class acquire_operation | |
{ | |
public: | |
explicit acquire_operation(semaphore& s, ptrdiff_t count); | |
auto await_ready() const noexcept -> bool; | |
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool; | |
auto await_resume() const -> acquire_result; | |
private: | |
friend semaphore; | |
semaphore& m_semaphore; | |
ptrdiff_t m_count; | |
std::coroutine_handle<> m_awaiting_coroutine; | |
acquire_operation* m_next{nullptr}; | |
}; | |
auto release(ptrdiff_t count) -> void; | |
/** | |
* Acquires a resource from the semaphore, if the semaphore has no resources available then | |
* this will wait until a resource becomes available. | |
*/ | |
[[nodiscard]] auto acquire(ptrdiff_t count) -> acquire_operation | |
{ | |
return acquire_operation{*this, count}; | |
} | |
/** | |
* Attemtps to acquire a resource if there is any resources available. | |
* @return True if the acquire operation was able to acquire a resource. | |
*/ | |
auto try_acquire(ptrdiff_t count) -> bool; | |
/** | |
* @return The maximum number of resources the semaphore can contain. | |
*/ | |
auto max() const noexcept -> std::ptrdiff_t | |
{ | |
return m_least_max_value; | |
} | |
/** | |
* The current number of resources available in this semaphore. | |
*/ | |
auto value() const noexcept -> std::ptrdiff_t | |
{ | |
return m_counter.load(std::memory_order::relaxed); | |
} | |
/** | |
* Stops the semaphore and will notify all release/acquire waiters to wake up in a failed state. | |
* Once this is set it cannot be un-done and all future oprations on the semaphore will fail. | |
*/ | |
auto notify_waiters() noexcept -> void; | |
private: | |
friend class release_operation; | |
friend class acquire_operation; | |
const std::ptrdiff_t m_least_max_value; | |
std::atomic<std::ptrdiff_t> m_counter; | |
std::mutex m_waiter_mutex{}; | |
acquire_operation* m_acquire_waiters{nullptr}; | |
std::atomic<bool> m_notify_all_set{false}; | |
}; | |
} // namespace coro | |
namespace test | |
{ | |
using namespace std::string_literals; | |
std::string semaphore::acquire_result_acquired = "acquired"s; | |
std::string semaphore::acquire_result_semaphore_stopped = "semaphore_stopped"s; | |
std::string semaphore::acquire_result_unknown = "unknown"s; | |
semaphore::semaphore(std::ptrdiff_t least_max_value_and_starting_value) : semaphore(least_max_value_and_starting_value, least_max_value_and_starting_value) {} | |
semaphore::semaphore(std::ptrdiff_t least_max_value, std::ptrdiff_t starting_value) | |
: m_least_max_value(least_max_value), m_counter(starting_value <= least_max_value ? starting_value : least_max_value) | |
{ | |
} | |
semaphore::~semaphore() | |
{ | |
notify_waiters(); | |
} | |
semaphore::acquire_operation::acquire_operation(semaphore& s, ptrdiff_t count) : m_semaphore(s), m_count(count) {} | |
auto semaphore::acquire_operation::await_ready() const noexcept -> bool | |
{ | |
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed)) | |
{ | |
return true; | |
} | |
return m_semaphore.try_acquire(m_count); | |
} | |
auto semaphore::acquire_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool | |
{ | |
std::unique_lock lk{m_semaphore.m_waiter_mutex}; | |
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed)) | |
{ | |
return false; | |
} | |
if (m_semaphore.try_acquire(m_count)) | |
{ | |
return false; | |
} | |
if (m_semaphore.m_acquire_waiters == nullptr) | |
{ | |
m_semaphore.m_acquire_waiters = this; | |
} | |
else | |
{ | |
// This is LIFO, but semaphores are not meant to be fair. | |
// Set our next to the current head. | |
m_next = m_semaphore.m_acquire_waiters; | |
// Set the semaphore head to this. | |
m_semaphore.m_acquire_waiters = this; | |
} | |
m_awaiting_coroutine = awaiting_coroutine; | |
return true; | |
} | |
auto semaphore::acquire_operation::await_resume() const -> acquire_result | |
{ | |
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed)) | |
{ | |
return acquire_result::semaphore_stopped; | |
} | |
return acquire_result::acquired; | |
} | |
auto semaphore::release(ptrdiff_t count) -> void | |
{ | |
// It seems like the atomic counter could be incremented, but then resuming a waiter could have | |
// a race between a new acquirer grabbing the just incremented resource value from us. So its | |
// best to check if there are any waiters first, and transfer owernship of the resource thats | |
// being released directly to the waiter to avoid this problem. | |
std::unique_lock lk{m_waiter_mutex}; | |
if (m_acquire_waiters != nullptr) | |
{ | |
acquire_operation* to_resume = m_acquire_waiters; | |
m_acquire_waiters = m_acquire_waiters->m_next; | |
lk.unlock(); | |
// This will transfer ownership of the resource to the resumed waiter. | |
to_resume->m_awaiting_coroutine.resume(); | |
} | |
else | |
{ | |
// Normally would be release but within a lock use releaxed. | |
m_counter.fetch_add(count, std::memory_order::relaxed); | |
} | |
} | |
auto semaphore::try_acquire(ptrdiff_t count) -> bool | |
{ | |
// Optimistically grab the resource. | |
auto previous = m_counter.fetch_sub(count, std::memory_order::acq_rel); | |
if (previous < count) | |
{ | |
// If it wasn't available undo the acquisition. | |
m_counter.fetch_add(count, std::memory_order::release); | |
return false; | |
} | |
return true; | |
} | |
auto semaphore::notify_waiters() noexcept -> void | |
{ | |
m_notify_all_set.exchange(true, std::memory_order::release); | |
while (true) | |
{ | |
std::unique_lock lk{m_waiter_mutex}; | |
if (m_acquire_waiters != nullptr) | |
{ | |
acquire_operation* to_resume = m_acquire_waiters; | |
m_acquire_waiters = m_acquire_waiters->m_next; | |
lk.unlock(); | |
to_resume->m_awaiting_coroutine.resume(); | |
} | |
else | |
{ | |
break; | |
} | |
} | |
} | |
} // namespace coro |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment