Last active
December 18, 2022 16:51
-
-
Save Desour/5351f1fe74a88f539ab909b6a7a254ed to your computer and use it in GitHub Desktop.
Use futexes and busy waiting for low-latency (avg ca. 130 ns) synchronous IPC calls
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// copied and adapted from the example in man page futex(2) ("futex_demo.c") | |
// requires libfmt (because of personal dislike of c++ stream output) | |
// compile with: | |
// g++ -O2 -Wall -Wextra -std=c++17 -g -o "test_futex" "test_futex.cpp" -lfmt | |
// The busy waiting uses x86-64 intrinsics: | |
// * rdtsc: IIRC, the timestamp values are processor-specific. (No longer used, | |
// benchmark runs faster without it.) | |
// * pause: always available on x86-64 | |
#include <cstdio> | |
#include <cerrno> | |
#include <stdatomic.h> | |
#include <atomic> | |
#include <cstdint> | |
#include <cstdlib> | |
#include <unistd.h> | |
#include <sys/wait.h> | |
#include <sys/mman.h> | |
#include <sys/syscall.h> | |
#include <linux/futex.h> | |
#include <sys/time.h> | |
#include <immintrin.h> | |
#include <fmt/core.h> | |
constexpr bool do_busy_waiting = true; | |
#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ | |
} while (0) | |
using atomic_uint32_t = std::atomic<uint32_t>; | |
static_assert(sizeof(atomic_uint32_t) == sizeof(uint32_t)); | |
static_assert(atomic_uint32_t::is_always_lock_free); | |
struct SharedState { | |
atomic_uint32_t futex1; | |
atomic_uint32_t futex2; | |
int val; | |
int data[32]; | |
}; | |
static SharedState *shared_state = nullptr; | |
static void busy_wait(int64_t dt) | |
{ | |
//~ int64_t t0 = _rdtsc(); | |
//~ int64_t t; | |
//~ do { | |
//~ _mm_pause(); | |
//~ t = _rdtsc(); | |
//~ } while (t < t0 + dt); | |
for (int64_t i = 0; i < dt; ++i) { | |
_mm_pause(); | |
} | |
} | |
static int | |
futex(atomic_uint32_t *uaddr, int futex_op, uint32_t val, | |
const struct timespec *timeout, uint32_t *uaddr2, uint32_t val3) | |
{ | |
return syscall(SYS_futex, uaddr, futex_op, val, | |
timeout, uaddr2, val3); | |
} | |
// futex values: | |
// * 0: receiving side is waiting with busy wait (or will check value before futexing) => needs no wake | |
// * 1: it's posted | |
// * 2: receiving side is waiting with futex syscall | |
static void | |
fwait(atomic_uint32_t *futexp) | |
{ | |
while (true) { | |
if constexpr (do_busy_waiting) { | |
for (int i = 0; i < 100; ++i) { | |
/* Is the futex available? */ | |
if (std::atomic_exchange(futexp, 0) == 1) | |
return; // yes | |
busy_wait(40); | |
} | |
// write 2 to show that we're futexing | |
if (std::atomic_exchange(futexp, 2) == 1) { | |
// futex was posted => change 2 to 0 (or 1 to 1) | |
std::atomic_fetch_and(futexp, 1); | |
return; | |
} | |
} else { | |
/* Is the futex available? */ | |
if (std::atomic_exchange(futexp, 2) == 1) | |
return; // yes | |
} | |
/* Futex is not available; wait. */ | |
long s = futex(futexp, FUTEX_WAIT, 2, nullptr, nullptr, 0); | |
if (s == -1 && errno != EAGAIN) | |
errExit("futex-FUTEX_WAIT"); | |
} | |
} | |
static void | |
fpost(atomic_uint32_t *futexp) | |
{ | |
uint32_t oldval = std::atomic_exchange(futexp, 1); | |
if (oldval == 2) { | |
long s = futex(futexp, FUTEX_WAKE, 1, nullptr, nullptr, 0); | |
if (s == -1) | |
errExit("futex-FUTEX_WAKE"); | |
} | |
} | |
double timediff_seconds(struct timespec t1, struct timespec t0) | |
{ | |
return t1.tv_sec - t0.tv_sec + 1.0e-9 * (t1.tv_nsec - t0.tv_nsec); | |
} | |
// recvs val and answers with 13 | |
// if val==0, dies | |
void child() | |
{ | |
int val; | |
do { | |
fwait(&shared_state->futex1); | |
val = shared_state->val; | |
shared_state->val = 13; | |
//~ shared_state->val = rand(); | |
//~ shared_state->data[0] = rand(); | |
//~ shared_state->data[1] = rand(); | |
//~ shared_state->data[2] = rand(); | |
//~ shared_state->data[3] = rand(); | |
//~ fmt::print("[c] got val={}\n", val); | |
fpost(&shared_state->futex2); | |
} while (val != 0); | |
} | |
// sends num_calls vals, then waits for child to die | |
void parent(pid_t child_pid) | |
{ | |
static constexpr int num_calls = do_busy_waiting ? 1000000 : 100000; | |
struct timespec t0; | |
struct timespec t1; | |
clock_gettime(CLOCK_MONOTONIC, &t0); | |
for (int val = num_calls-1; val >= 0; --val) { | |
fwait(&shared_state->futex2); | |
//~ fmt::print("[p] got val={}, sending val={}\n", shared_state->val, val); | |
shared_state->val = val; | |
fpost(&shared_state->futex1); | |
} | |
clock_gettime(CLOCK_MONOTONIC, &t1); | |
double dt = timediff_seconds(t1, t0); | |
fmt::print("[p] dt = {} s; per call: {} ns\n", dt, dt / num_calls * 1e9); | |
fmt::print("[p] waiting for child to die...\n"); | |
int wstatus; | |
waitpid(child_pid, &wstatus, 0); | |
(void)wstatus; | |
} | |
int main() | |
{ | |
setbuf(stdout, nullptr); | |
fmt::print("starting...\n"); | |
/* Create a shared anonymous mapping that will hold the futexes. | |
Since the futexes are being shared between processes, we | |
subsequently use the "shared" futex operations (i.e., not the | |
ones suffixed "_PRIVATE"). */ | |
void *mmap_mem = mmap(nullptr, sizeof(SharedState), PROT_READ | PROT_WRITE, | |
MAP_ANONYMOUS | MAP_SHARED, -1, 0); | |
if (mmap_mem == MAP_FAILED) | |
errExit("mmap"); | |
shared_state = new(mmap_mem) SharedState; | |
shared_state->futex1 = 0; /* State: unavailable */ | |
shared_state->futex2 = 1; /* State: available */ | |
shared_state->val = -1; | |
/* Create a child process that inherits the shared anonymous | |
mapping. */ | |
pid_t pid = fork(); | |
if (pid == -1) { | |
errExit("fork"); | |
} else if (pid == 0) { | |
child(); | |
fmt::print("[c] end\n"); | |
} else { | |
parent(pid); | |
fmt::print("[p] end\n"); | |
//~ munmap(mmapped_mem, 0x1000); | |
} | |
wait(nullptr); | |
exit(EXIT_SUCCESS); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment