Skip to content

Instantly share code, notes, and snippets.

@resetius
Last active August 4, 2025 22:45
Show Gist options
  • Save resetius/6143ba58a7a8e6da9adc17a8220b283c to your computer and use it in GitHub Desktop.
Save resetius/6143ba58a7a8e6da9adc17a8220b283c to your computer and use it in GitHub Desktop.
ydb::actors example
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/actors/core/executor_pool_basic.h>
#include <ydb/library/actors/core/scheduler_basic.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/util/should_continue.h>
#include <util/system/sigset.h>
#include <util/generic/xrange.h>
// rpi5 (4 cores, 16GB), actors=100, messages=1000000, batch size=1024
// akka: 89795.61 msg/s
// ydb/actors (4 threads): 112699 msg/s
// coroio (1 thread): 182181 msg/s
using namespace NActors;
static TProgramShouldContinue ShouldContinue;
void OnTerminate(int) {
ShouldContinue.ShouldStop();
}
class TPingActor : public TActorBootstrapped<TPingActor> {
int Idx_ = 0;
int Messages_ = 0;
int Batch_ = 0;
int Remain_ = 0;
std::vector<TActorId>& Ring_;
bool TimerStarted_ = false;
std::chrono::steady_clock::time_point StartTime_;
int LastPercent_ = -1;
void Handle(TEvents::TEvPing::TPtr &ev) {
Y_UNUSED(ev);
if (Idx_ == 0 && !TimerStarted_) [[unlikely]] {
StartTime_ = std::chrono::steady_clock::now();
TimerStarted_ = true;
}
if (Idx_ == 0 && Remain_ == 0) [[unlikely]] {
return;
}
Send(Ring_[(Idx_+1)%Ring_.size()], new TEvents::TEvPing());
if (Idx_ == 0) {
--Remain_;
PrintProgress();
if (Remain_ == 0) {
ShutdownRing();
}
}
}
public:
TPingActor(int idx, int messages, int batch, std::vector<TActorId>& ring)
: Idx_(idx)
, Messages_(messages)
, Batch_(batch)
, Remain_(Messages_)
, Ring_(ring)
{}
STFUNC(StatePing) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvPing, Handle);
}
}
void ShutdownRing() {
auto now = std::chrono::steady_clock::now();
double secs = std::chrono::duration<double>(now - StartTime_).count();
std::cout << "\nRing throughput: "
<< (double)(Messages_) / secs
<< " msg/s\n";
OnTerminate(0);
}
void Bootstrap() {
Become(&TThis::StatePing);
if (Idx_ == 0) {
for (int i = 0; i < Batch_; ++i) {
Send(Ring_[(Idx_+1)%Ring_.size()], new TEvents::TEvPing());
}
}
}
void PrintProgress() {
size_t processed = Messages_ - Remain_;
int percent = int((processed * 100) / Messages_);
if (percent != LastPercent_) {
LastPercent_ = percent;
const int barWidth = 50;
int pos = (percent * barWidth) / 100;
std::cerr << "\r[";
for (int i = 0; i < barWidth; ++i) {
if (i < pos) {
std::cerr << "=";
}
else if (i == pos) {
std::cerr << ">";
}
else {
std::cerr << " ";
}
}
std::cerr << "] " << percent << "%";
std::cerr.flush();
}
}
};
THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 threads, ui32 pools) {
Y_ABORT_UNLESS(threads > 0 && threads < 100);
Y_ABORT_UNLESS(pools > 0 && pools < 10);
auto setup = MakeHolder<TActorSystemSetup>();
setup->NodeId = 1;
setup->ExecutorsCount = pools;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[pools]);
for (ui32 idx : xrange(pools)) {
setup->Executors[idx] = new TBasicExecutorPool(idx, threads, 50);
}
setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0));
return setup;
}
int main(int argc, char **argv) {
int actors = 2;
int messages = 100;
int batch = 1;
for (int i = 1; i < argc; ++i) {
if (std::string(argv[i]) == "--actors" && i + 1 < argc) {
actors = std::stoi(argv[++i]);
} else if (std::string(argv[i]) == "--messages" && i + 1 < argc) {
messages = std::stoi(argv[++i]);
} else if (std::string(argv[i]) == "--batch" && i + 1 < argc) {
batch = std::stoi(argv[++i]);
}
}
std::vector<TActorId> ringIds;
ringIds.resize(actors);
#ifdef _unix_
signal(SIGPIPE, SIG_IGN);
#endif
signal(SIGINT, &OnTerminate);
signal(SIGTERM, &OnTerminate);
THolder<TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup(4, 1);
TActorSystem actorSystem(actorSystemSetup);
actorSystem.Start();
for (int i = 0; i < actors; ++i) {
ringIds[i] = actorSystem.Register(new TPingActor(i, messages, batch, ringIds));
}
while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) {
Sleep(TDuration::MilliSeconds(200));
}
actorSystem.Stop();
actorSystem.Cleanup();
return ShouldContinue.GetReturnCode();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment