Last active
February 20, 2024 05:32
-
-
Save jizhilong/1b158b9b2446d01b1c77510056e98f73 to your computer and use it in GitHub Desktop.
timerfd_create/timerfd_settime/timerfd_gettime implementations based on ctypes and libc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
timerfd_xx functions and demos for python versions prior to 3.13, implemented with ctypes and libc. | |
""" | |
try: | |
from os import timerfd_create, timerfd_settime, timerfd_gettime, timerfd_settime_ns, timerfd_gettime_ns | |
from os import TFD_NONBLOCK, TFD_CLOEXEC, TFD_TIMER_ABSTIME, TFD_TIMER_CANCEL_ON_SET | |
except ImportError: | |
import os | |
import time | |
from typing import Optional | |
from ctypes import CDLL, c_long, Structure, c_int, get_errno, POINTER, byref | |
from ctypes.util import find_library | |
class TimeSpec(Structure): | |
_fields_ = [ | |
("tv_sec", c_long), | |
("tv_nsec", c_long) | |
] | |
class ITimerSpec(Structure): | |
_fields_ = [ | |
("it_interval", TimeSpec), | |
("it_value", TimeSpec) | |
] | |
ITimerSpecP = POINTER(ITimerSpec) | |
CLOCK_REALTIME = 0 | |
CLOCK_MONOTONIC = 1 | |
CLOCK_BOOTTIME = 7 | |
CLOCK_REALTIME_ALARM = 8 | |
CLOCK_BOOTTIME_ALARM = 9 | |
TFD_NONBLOCK = 0x800 | |
TFD_CLOEXEC = 0x8000 | |
TFD_TIMER_ABSTIME = 1 | |
TFD_TIMER_CANCEL_ON_SET = 2 | |
libc_name = find_library('c') | |
if not libc_name: | |
raise ImportError("cannot find libc") | |
libc = CDLL(libc_name, use_errno=True) | |
c_timerfd_create = libc.timerfd_create | |
c_timerfd_create.argtypes = [c_int, c_int] | |
c_timerfd_create.restype = c_int | |
c_timerfd_settime = libc.timerfd_settime | |
c_timerfd_settime.argtypes = [c_int, c_int, ITimerSpecP, ITimerSpecP] | |
c_timerfd_settime.restype = c_int | |
c_timerfd_gettime = libc.timerfd_gettime | |
c_timerfd_gettime.argtypes = [c_int, ITimerSpecP] | |
c_timerfd_gettime.restype = c_int | |
def timerfd_create(clock_id=CLOCK_REALTIME, /, *, flags=0) -> int: | |
ret = c_timerfd_create(clock_id, flags) | |
if ret < 0: | |
err_code = get_errno() | |
err_str = os.strerror(err_code) | |
raise OSError(f"timerfd_create failed {err_code} {err_str}") | |
return ret | |
def timerfd_settime(fd: int, /, *, | |
flags: int = 0, | |
initial: float = 0.0, interval: float = 0.0) -> tuple[float, float]: | |
new_spec = ITimerSpec() | |
new_spec.it_value.tv_sec = int(initial) | |
new_spec.it_value.tv_nsec = int((initial - new_spec.it_value.tv_sec) * 1e9) | |
new_spec.it_interval.tv_sec = int(interval) | |
new_spec.it_interval.tv_nsec = int((interval - new_spec.it_interval.tv_sec) * 1e9) | |
old_spec = ITimerSpec() | |
ret = c_timerfd_settime(fd, flags, byref(new_spec), byref(old_spec)) | |
if ret < 0: | |
err_code = get_errno() | |
err_str = os.strerror(err_code) | |
raise OSError(f"timerfd_settime failed {err_code} {err_str}") | |
old_initial = old_spec.it_value.tv_sec + old_spec.it_value.tv_nsec * 1e-9 | |
old_interval = old_spec.it_interval.tv_sec + old_spec.it_interval.tv_nsec * 1e-9 | |
return old_initial, old_interval | |
def timerfd_settime_ns(fd: int, /, *, | |
flags: int = 0, | |
initial: int = 0, interval: int = 0) -> tuple[int, int]: | |
new_spec = ITimerSpec() | |
new_spec.it_value.tv_sec = initial // 10**9 | |
new_spec.it_value.tv_nsec = initial % 10**9 | |
new_spec.it_interval.tv_sec = interval // 10**9 | |
new_spec.it_interval.tv_nsec = interval % 10**9 | |
old_spec = ITimerSpec() | |
ret = c_timerfd_settime(fd, flags, byref(new_spec), byref(old_spec)) | |
if ret < 0: | |
err_code = get_errno() | |
err_str = os.strerror(err_code) | |
raise OSError(f"timerfd_settime failed {err_code} {err_str}") | |
old_initial = old_spec.it_value.tv_sec * 10**9 + old_spec.it_value.tv_nsec | |
old_interval = old_spec.it_interval.tv_sec * 10**9 + old_spec.it_interval.tv_nsec | |
return old_initial, old_interval | |
def timerfd_gettime(fd) -> tuple[float, float]: | |
old_spec = ITimerSpec() | |
ret = c_timerfd_gettime(fd, byref(old_spec)) | |
if ret < 0: | |
err_code = get_errno() | |
err_str = os.strerror(err_code) | |
raise OSError(f"timerfd_gettime failed {err_code} {err_str}") | |
old_initial = old_spec.it_value.tv_sec + old_spec.it_value.tv_nsec * 1e-9 | |
old_interval = old_spec.it_interval.tv_sec + old_spec.it_interval.tv_nsec * 1e-9 | |
return old_initial, old_interval | |
def timerfd_gettime_ns(fd) -> tuple[int, int]: | |
old_spec = ITimerSpec() | |
ret = c_timerfd_gettime(fd, byref(old_spec)) | |
if ret < 0: | |
err_code = get_errno() | |
err_str = os.strerror(err_code) | |
raise OSError(f"timerfd_gettime failed {err_code} {err_str}") | |
old_initial = old_spec.it_value.tv_sec * 10**9 + old_spec.it_value.tv_nsec | |
old_interval = old_spec.it_interval.tv_sec * 10**9 + old_spec.it_interval.tv_nsec | |
return old_initial, old_interval | |
if __name__ == '__main__': | |
import logging | |
import socket | |
import selectors | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
logger = logging.getLogger("timerfd_demo") | |
timer_fd = timerfd_create() | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.bind(('0.0.0.0', 51121)) | |
selector = selectors.DefaultSelector() | |
selector.register(sock, selectors.EVENT_READ) | |
selector.register(timer_fd, selectors.EVENT_READ) | |
timerfd_settime(timer_fd, initial=1, interval=5) | |
while True: | |
for key, mask in selector.select(): | |
if key.fileobj == sock: | |
data, addr = sock.recvfrom(1024) | |
timerfd_settime(timer_fd, initial=5, interval=5) | |
logger.info("recv %s from %s", data, addr) | |
sock.sendto(b'hello', addr) | |
elif key.fileobj == timer_fd: | |
times = int.from_bytes(os.read(timer_fd, 8), 'little') | |
logger.info("idle timer triggerd %s more times", times) | |
else: | |
logger.info("unknown fd %s", key.fileobj) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment