Skip to content

Instantly share code, notes, and snippets.

@IhostVlad
Last active September 15, 2021 10:19
Show Gist options
  • Save IhostVlad/127483e7bfae6f8e6df4f89093559d01 to your computer and use it in GitHub Desktop.
Save IhostVlad/127483e7bfae6f8e6df4f89093559d01 to your computer and use it in GitHub Desktop.
// g++ -o bench.exe --std=c++17 -lpqxx bench.cc
// POSTGRES_CONNECTION_STRING=<connection string> ./bench.exe <threads count> <iterations count> <max rows> <mode 0-5>
#include <iostream>
#include <functional>
#include <pqxx/pqxx>
#include <thread>
#include <future>
#include <cstring>
#include <vector>
#include <random>
#include <limits>
constexpr uint64_t FAIL = (uint64_t)(-1);
int intrand() {
static thread_local std::mt19937 generator {};
std::uniform_int_distribution<uint64_t> distribution {};
return distribution(generator);
}
uint64_t execute_query(pqxx::connection& conn, const std::string& sql) {
pqxx::work work {conn};
auto execution = work.exec(sql);
pqxx::result R{execution };
for(auto row: R) { (void)(row[0]); }
work.commit();
return R.size();
}
std::string getenv(std::string const & key) {
char * val = getenv( key.c_str() );
return val == NULL ? std::string("") : std::string(val);
}
uint64_t worker(std::string conn_str, uint64_t maxrows, uint64_t thid, uint64_t threads, uint64_t iters, uint64_t mode) {
typedef std::chrono::high_resolution_clock Time;
typedef std::chrono::nanoseconds ns;
typedef std::chrono::duration<float> fsec;
auto t0 = Time::now();
while(true) {
try {
pqxx::connection C{conn_str};
char sql_query[4096] = {};
for(uint64_t k=0; k<iters; k++) {
uint64_t i = ((k + intrand()) % 0x7fffffffull) % maxrows;
switch(mode) {
case 5: {
uint64_t lv = intrand();
if(lv % 3 == 0) {
snprintf(sql_query, 4096,
"INSERT INTO \"public\".\"load_test\"(\"idx\", \"content\") VALUES (%llu, 'Content %llu ') ON CONFLICT DO NOTHING; ",
i, i );
} else if (lv % 3 == 1) {
snprintf(sql_query, 4096,
"UPDATE \"public\".\"load_test\" SET \"content\"='Content %llu ' WHERE \"idx\" = %llu; ",
i, i );
} else {
snprintf(sql_query, 4096,
"DELETE FROM \"public\".\"load_test\" WHERE \"idx\" = %llu; ",
i );
}
break;
}
case 4: {
snprintf(sql_query, 4096,
"UPDATE \"public\".\"load_test\" SET \"content\"='Content %llu ' WHERE \"idx\" = %llu; ",
i, i );
break;
}
case 3: {
snprintf(sql_query, 4096,
"UPDATE \"public\".\"load_test\" SET \"content\"='Content %llu ' WHERE \"idx\" = %llu; ",
i, ((maxrows / threads) * i + thid) % maxrows );
break;
}
case 2: {
snprintf(sql_query, 4096,
"UPDATE \"public\".\"load_test\" SET \"content\"='Content %llu ' WHERE \"idx\" = %llu; ",
i, (maxrows / threads) * thid + i % (maxrows / threads) );
break;
}
case 1: {
snprintf(sql_query, 4096, "SELECT 0;");
break;
}
default:
break;
}
execute_query(C, sql_query);
}
break;
} catch (std::exception const &e) {
std::string err = e.what();
if(err.find("could not translate host name") != std::string::npos ||
err.find("could not create socket") != std::string::npos ||
err.find("server closed the connection unexpectedly") != std::string::npos
) {
std::this_thread::sleep_for(std::chrono::milliseconds(100 ));
} else {
std::cerr << err << std::endl;
return FAIL;
}
}
}
auto t1 = Time::now();
fsec fs = t1 - t0;
ns d = std::chrono::duration_cast<ns>(fs);
uint64_t average = d.count() / iters;
return average;
}
int main(int argc, char* argv[]) {
if(argc != 5) {
std::cout << "Wrong arguments count" << std::endl;
return 1;
}
uint64_t threads = atoi(argv[1]);
uint64_t iters = atoi(argv[2]);
uint64_t maxrows = atoi(argv[3]);
uint64_t mode = atoi(argv[4]);
if(mode < 1 || mode > 5) {
std::cout << "Wrong mode" << std::endl;
return 1;
}
std::string conn_str = getenv(std::string{"POSTGRES_CONNECTION_STRING"});
std::cout << threads << " " << iters << " " << mode << std::endl;
unsigned char conn_buf[sizeof(pqxx::connection)] = {};
pqxx::connection& C = reinterpret_cast<pqxx::connection&>(conn_buf);
try {
new (&C) pqxx::connection(conn_str);
{
pqxx::work work {C};
work.exec("DROP TABLE IF EXISTS \"public\".\"load_test\";");
work.commit();
}
{
pqxx::work work {C};
work.exec("CREATE TABLE \"public\".\"load_test\"(\"idx\" BIGINT, \"content\" VARCHAR(190), PRIMARY KEY(\"idx\"));");
work.commit();
}
if(mode >= 2 && mode <= 4) {
pqxx::work work {C};
char sql_query[4096] = {};
snprintf(sql_query, 4096,
"INSERT INTO \"public\".\"load_test\"(\"idx\",\"content\") SELECT i AS \"idx\", '' AS \"content\" FROM generate_series(0,%llu) as t(i);",
maxrows);
work.exec(sql_query);
work.commit();
}
} catch (std::exception const &e) {
std::cerr << e.what() << std::endl;
return 1;
}
std::vector<std::future<uint64_t>> timing_futures = {};
for(uint64_t idx = 0; idx < threads; idx++) {
timing_futures.push_back(std::async(&worker, conn_str, maxrows, idx, threads, iters, mode));
}
uint64_t sum_timing = 0;
for(uint64_t idx = 0; idx < timing_futures.size(); idx++) {
uint64_t timing = timing_futures[idx].get();
if(timing != FAIL) {
sum_timing += timing;
} else {
std::cout << "Thread " << idx << " failed" << std::endl;
}
}
{
pqxx::work work {C};
work.exec("DROP TABLE \"public\".\"load_test\";");
work.commit();
}
uint64_t average = sum_timing / timing_futures.size();
std::cout << "Average " << average << " ns" << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment