Created
September 28, 2017 07:39
-
-
Save ducky-hong/4beb47df511d1999b4363900ceed8f38 to your computer and use it in GitHub Desktop.
detecting system time changes via timerfd
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# detecting system time changes via timerfd | |
# https://github.com/tsavola/pytimerfd | |
# | |
# Usage: | |
# $ python3 timerfd.py | |
# $ date -s "2 OCT 2006 18:00:00" | |
import ctypes | |
import ctypes.util | |
import selectors | |
import os | |
import math | |
import struct | |
CLOCK_REALTIME = 0 | |
TFD_NONBLOCK = 0o004000 | |
TFD_TIMER_ABSTIME = 1 << 0 | |
TFD_TIMER_CANCEL_ON_SET = 1 << 1 | |
libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True) | |
class timespec(ctypes.Structure): | |
_fields_ = [ | |
("tv_sec", libc.time.restype), | |
("tv_nsec", ctypes.c_long), | |
] | |
def __init__(self, time=None): | |
ctypes.Structure.__init__(self) | |
if time is not None: | |
self.set_time(time) | |
def __repr__(self): | |
return "timerfd.timespec(%s)" % self.get_time() | |
def set_time(self, time): | |
fraction, integer = math.modf(time) | |
self.tv_sec = int(integer) | |
self.tv_nsec = int(fraction * 1000000000) | |
def get_time(self): | |
if self.tv_nsec: | |
return self.tv_sec + self.tv_nsec / 1000000000.0 | |
else: | |
return self.tv_sec | |
class itimerspec(ctypes.Structure): | |
_fields_ = [ | |
("it_interval", timespec), | |
("it_value", timespec), | |
] | |
def __init__(self, interval=None, value=None): | |
ctypes.Structure.__init__(self) | |
if interval is not None: | |
self.it_interval.set_time(interval) | |
if value is not None: | |
self.it_value.set_time(value) | |
def __repr__(self): | |
items = [("interval", self.it_interval), ("value", self.it_value)] | |
args = ["%s=%s" % (name, value.get_time()) for name, value in items] | |
return "timerfd.itimerspec(%s)" % ", ".join(args) | |
def set_interval(self, time): | |
self.it_interval.set_time(time) | |
def get_interval(self): | |
return self.it_interval.get_time() | |
def set_value(self, time): | |
self.it_value.set_time(time) | |
def get_value(self): | |
return self.it_value.get_time() | |
def set_timer(): | |
its = itimerspec(None, (1 << 31) - 1) | |
libc.timerfd_settime(fd, TFD_TIMER_ABSTIME|TFD_TIMER_CANCEL_ON_SET, ctypes.pointer(its), None) | |
print(its) | |
fd = libc.timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK) | |
print("timerfd:", fd) | |
sel = selectors.DefaultSelector() | |
sel.register(fd, selectors.EVENT_READ) | |
while True: | |
set_timer() | |
events = sel.select() | |
print(events) | |
try: | |
os.read(fd, 8) | |
except OSError as e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment