Skip to content

Instantly share code, notes, and snippets.

@hemantsonu20
Created September 9, 2022 10:57
Show Gist options
  • Save hemantsonu20/8a5195fc005ddaf8373c79888982633b to your computer and use it in GitHub Desktop.
Save hemantsonu20/8a5195fc005ddaf8373c79888982633b to your computer and use it in GitHub Desktop.
Job Scheduler With Delay
package com.example.demo;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
public class Main {
interface JobScheduler {
void schedule(Runnable job, long delayMs);
}
public static class DelayTask implements Comparable<DelayTask> {
private final Runnable runnable;
private final long excutionTime;
public DelayTask(Runnable runnable, long delayMs) {
this.runnable = runnable;
this.excutionTime = System.currentTimeMillis() + delayMs;
}
public boolean shouldRun() {
return this.excutionTime <= System.currentTimeMillis();
}
public long getWaitMs() {
long waitMs = this.excutionTime - System.currentTimeMillis();
return waitMs > 0 ? waitMs : 0;
}
public Runnable getRunnable() {
return runnable;
}
@Override
public int compareTo(DelayTask o) {
return Long.compare(this.excutionTime, o.excutionTime);
}
}
public static class DelayJobScheduler implements JobScheduler {
private final Queue<DelayTask> queue = new PriorityBlockingQueue<>();
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private Lock lock = new ReentrantLock();
private Condition available = lock.newCondition();
private boolean running = true;
public DelayJobScheduler() {
executorService.submit(this::start);
}
@Override
public void schedule(Runnable job, long delayMs) {
lock.lock();
try {
DelayTask task = new DelayTask(job, delayMs);
queue.offer(task);
if (queue.peek() == task) {
available.signal();
}
} finally {
lock.unlock();
}
}
void start() {
while (running) {
try {
DelayTask delayTask = this.take();
executorService.submit(delayTask.getRunnable());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
DelayTask take() throws InterruptedException {
while (true) {
lock.lock();
try {
DelayTask delayTask = queue.peek();
if (delayTask == null) {
// queue is empty
available.await();
} else {
if (delayTask.shouldRun()) {
return this.queue.poll();
} else {
available.await(delayTask.getWaitMs(), TimeUnit.MILLISECONDS);
}
}
} finally {
lock.unlock();
}
}
}
}
public static class RunnableTask implements Runnable {
private final int num;
public RunnableTask(int num) {
this.num = num;
}
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ": " + num);
}
}
public static void main(String[] args) {
JobScheduler jobScheduler = new DelayJobScheduler();
IntStream.range(0, 10).forEach(num -> jobScheduler.schedule(new RunnableTask(num), num * 1000));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment