Last active
December 16, 2020 21:01
-
-
Save kimathie/5f1e409caadf0f96bf2423c27a34673a to your computer and use it in GitHub Desktop.
Hash timing wheel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* A timing wheel is a data structure that that provides the ability to manage | |
* an event/action that should take place within a specified time window. The | |
* idea is to have a time facility that schedules and manage timeouts | |
* efficiently as described by George Varghese and Tony Lauck. | |
* | |
* This implementation is a hashed wheel timer with two critical parameters only | |
* to adjust latency based on needs. | |
* | |
* <h3> Interval</h3> | |
* | |
* This is the time period in which the timer will rotate on every tick. Each | |
* tick will perform 3 actions | |
* <p> | |
* 1. Check for newly scheduled timers and insert them to the wheel.</p> | |
* <p> | |
* 2. Check for any cancelled timers and remove them from the wheel.</p> | |
* <p> | |
* 3. Rotate through a select bucket and check for expired timers.</p> | |
* | |
* <p> | |
* These actions ensure that the timers are added to the timer at approximate | |
* time, checked for staleness that is if they have expired or been cancelled | |
* and finally clean up that limited data structure to allow room for more | |
* timers to be scheduled in future. Depending on the needs of the application a | |
* lower interval increases the of accuracy the timer. By default the value is | |
* 100ms. | |
* </p> | |
* | |
* <h3>Wheel Size</h3> | |
* This is the number of elements that the timing wheel can store. A timing | |
* wheel is a hash table under the hood which is a data structure that maps | |
* unique keys to values. In a timing wheel a key is represented by the expiry | |
* time of the timer and the value would be the timer it's self. Depending on | |
* the needs of the application a large wheel size would be set to handle a | |
* large number of timeouts. By default the value herein is 64. | |
* | |
* @author kimathie | |
*/ | |
public class TimerWheel { | |
private long interval; | |
private Thread thread; | |
private TimeoutBucket[] wheel; | |
private ConcurrentLinkedQueue<Participant> scheduled; | |
private ConcurrentLinkedQueue<Participant> cancelled; | |
private static final int STARTED = 0; | |
private static final int STOPPED = 1; | |
private final AtomicInteger state = new AtomicInteger(STOPPED); | |
public TimerWheel() { | |
this(100, TimeUnit.MILLISECONDS, 64); | |
} | |
public TimerWheel(long interval, TimeUnit unit) { | |
this(interval, unit, 64); | |
} | |
public TimerWheel(int wheelSize) { | |
this(100, TimeUnit.MILLISECONDS, wheelSize); | |
} | |
public TimerWheel(long interval, TimeUnit unit, int wheelSize) { | |
this.interval = unit.toMillis(interval); | |
this.wheel = createWheel(wheelSize); | |
this.scheduled = new ConcurrentLinkedQueue(); | |
this.cancelled = new ConcurrentLinkedQueue(); | |
} | |
public void start() { | |
if (STOPPED == state.get()) { | |
thread = new Thread(new Worker()); | |
thread.start(); | |
state.compareAndSet(STOPPED, STARTED); | |
} | |
} | |
public void stop() { | |
if (STARTED == state.get()) { | |
thread.interrupt(); | |
state.compareAndSet(STARTED, STOPPED); | |
} | |
} | |
public Timeout schedule(TimerTask task, long delay, TimeUnit unit) { | |
long deadline = System.nanoTime() + unit.toNanos(delay); | |
Participant p = new Participant(this, task, deadline); | |
scheduled.add(p); | |
return p; | |
} | |
class Worker implements Runnable { | |
@Override | |
public void run() { | |
int index = 0; | |
while (true) { | |
try { | |
if (STOPPED == state.get()) { | |
this.cleanUp(); | |
break; | |
} | |
Thread.sleep(interval); | |
processCancelled(); | |
processScheduled(); | |
processTimeouts(index); | |
index = index == wheel.length - 1 ? 0 : ++index; | |
} catch (InterruptedException e) { | |
state.compareAndSet(STARTED, STOPPED); | |
} | |
} | |
} | |
private void cleanUp() { | |
//Yet to implement but ideally should drain all timers and expire them gracefully | |
} | |
private void processCancelled() { | |
while (true) { | |
Participant p = cancelled.poll(); | |
if (p == null) { | |
break; | |
} | |
p.remove(); | |
} | |
} | |
private void processScheduled() { | |
while (true) { | |
Participant p = scheduled.poll(); | |
if (p == null) { | |
break; | |
} | |
if (p.isCancelled()) { | |
p.cancel(); | |
continue; | |
} | |
int index = (int) (p.deadline % wheel.length); | |
TimeoutBucket bucket = wheel[index]; | |
bucket.addTimeout(p); | |
} | |
} | |
private void processTimeouts(int index) { | |
TimeoutBucket bucket = wheel[index]; | |
bucket.expireTimeouts(System.nanoTime()); | |
} | |
} | |
class TimeoutBucket { | |
private Participant head; | |
private Participant tail; | |
/** | |
* Add {@link Timeout} to this bucket. | |
*/ | |
public void addTimeout(Participant p) { | |
p.bucket = this; | |
if (head == null) { | |
head = tail = p; | |
} else { | |
tail.next = p; | |
p.prev = tail; | |
tail = p; | |
} | |
} | |
public void expireTimeouts(long deadline) { | |
Participant p = head; | |
// process all timeouts | |
while (p != null) { | |
Participant next = p.next; | |
if (p.isStale(deadline)) { | |
next = remove(p); | |
p.expire(); | |
} else if (p.isCancelled()) { | |
next = remove(p); | |
} | |
p = next; | |
} | |
} | |
public Participant remove(Participant p) { | |
Participant next = p.next; | |
// remove timeout that was either processed or cancelled by updating the linked-list | |
if (p.prev != null) { | |
p.prev.next = next; | |
} | |
if (p.next != null) { | |
p.next.prev = p.prev; | |
} | |
if (p == head) { | |
// if timeout is also the tail we need to adjust the entry too | |
if (p == tail) { | |
tail = null; | |
head = null; | |
} else { | |
head = next; | |
} | |
} else if (p == tail) { | |
// if the timeout is the tail modify the tail to be the prev node. | |
tail = p.prev; | |
} | |
// null out prev, next and bucket to allow for GC. | |
p.prev = null; | |
p.next = null; | |
p.bucket = null; | |
return next; | |
} | |
} | |
class Participant implements Timeout { | |
public long deadline; | |
public TimeoutBucket bucket; | |
public Participant next; | |
public Participant prev; | |
private final int CREATED = 0; | |
private final int CANCELLED = 1; | |
private final int EXPIRED = 2; | |
private final AtomicInteger state = new AtomicInteger(CREATED); | |
private final TimerTask task; | |
private final TimerWheel timer; | |
public Participant(TimerWheel timer, TimerTask task, long deadline) { | |
this.timer = timer; | |
this.task = task; | |
this.deadline = deadline; | |
} | |
public boolean isStale(long time) { | |
return time >= deadline; | |
} | |
public int state() { | |
return state.get(); | |
} | |
public long deadline() { | |
return deadline; | |
} | |
@Override | |
public TimerTask task() { | |
return task; | |
} | |
@Override | |
public boolean cancel() { | |
if (!state.compareAndSet(CREATED, CANCELLED)) { | |
return false; | |
} | |
timer.cancelled.add(this); | |
return true; | |
} | |
@Override | |
public boolean isCancelled() { | |
return state() == CANCELLED; | |
} | |
@Override | |
public boolean isExpired() { | |
return state() == EXPIRED; | |
} | |
public void expire() { | |
if (!state.compareAndSet(CREATED, EXPIRED)) { | |
return; | |
} | |
try { | |
task.run(this); | |
} catch (Exception t) { | |
t.printStackTrace(); | |
} | |
} | |
public void remove() { | |
TimeoutBucket b = this.bucket; | |
if (b != null) { | |
b.remove(this); | |
} | |
} | |
} | |
private TimeoutBucket[] createWheel(int wheelSize) { | |
if (wheelSize <= 0) { | |
throw new IllegalArgumentException( | |
"Wheel size must be greater than 0: " + wheelSize); | |
} | |
if (wheelSize > 1073741824) { | |
throw new IllegalArgumentException( | |
"Wheel size may not be greater than 2^30: " + wheelSize); | |
} | |
/** | |
* Gets the nearest Prime number to ensure even distribution of hashes | |
* in the wheel. | |
*/ | |
wheelSize = nearestPrime(wheelSize); | |
/** | |
* Initializes the buckets. | |
*/ | |
TimeoutBucket[] timeoutWheel = new TimeoutBucket[wheelSize]; | |
for (int i = 0; i < timeoutWheel.length; i++) { | |
timeoutWheel[i] = new TimeoutBucket(); | |
} | |
return timeoutWheel; | |
} | |
private int nearestPrime(int M) { | |
if (!isPrime(M)) { | |
M = nearestPrime(++M); | |
} | |
return M; | |
} | |
private boolean isPrime(int M) { | |
for (int i = 2; i <= Math.sqrt(M); i++) { | |
if (M % i == 0) { | |
return false; | |
} | |
} | |
return true; | |
} | |
} | |
/** | |
* A handle associated with a {@link TimerTask} that is returned by a | |
* {@link Timer}. | |
*/ | |
static interface Timeout { | |
/** | |
* Returns the {@link TimerTask} which is associated with this handle. | |
* | |
* @return | |
*/ | |
public abstract TimerTask task(); | |
/** | |
* Returns {@code true} if and only if the {@link TimerTask} associated with | |
* this handle has been expired. | |
* | |
* @return | |
*/ | |
public abstract boolean isExpired(); | |
/** | |
* Returns {@code true} if and only if the {@link TimerTask} associated with | |
* this handle has been cancelled. | |
* | |
* @return | |
*/ | |
public abstract boolean isCancelled(); | |
/** | |
* Attempts to cancel the {@link TimerTask} associated with this handle. If | |
* the task has been executed or cancelled already, it will return with no | |
* side effect. | |
* | |
* @return True if the cancellation completed successfully, otherwise false | |
*/ | |
public abstract boolean cancel(); | |
} | |
/** | |
* A task which is executed after the delay specified with | |
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. | |
*/ | |
static interface TimerTask { | |
/** | |
* Executed after the delay specified with | |
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}. | |
* | |
* @param timeout a handle which is associated with this task | |
* @throws java.lang.Exception | |
*/ | |
public abstract void run(Timeout timeout) throws Exception; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment