Skip to content

Instantly share code, notes, and snippets.

@shaoyihe
Created March 17, 2017 11:35
Show Gist options
  • Save shaoyihe/037300205ffa6b1f2f57a5b4fb0243a0 to your computer and use it in GitHub Desktop.
Save shaoyihe/037300205ffa6b1f2f57a5b4fb0243a0 to your computer and use it in GitHub Desktop.
并发任务执行
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* on 2017/3/17.
*/
public class TaskControl {
private static int DEFAULT_THREADS = Runtime.getRuntime().availableProcessors() * 2;
private ExecutorService executorService;
private int maxConcurrentThread;
private Iterator<Runnable> tasks;
private Runnable whenFinish;
private AtomicBoolean stated = new AtomicBoolean(false);
private CountDownLatch countDownLatch;
private TaskControl() {
}
public void start() {
if (stated.compareAndSet(false, true)) {
countDownLatch = new CountDownLatch(maxConcurrentThread);
for (int i = 0; i < maxConcurrentThread; ++i) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (true) {
Runnable task = getNextTask();
if (task == null) {
countDownLatch.countDown();
break;
} else {
task.run();
}
}
}
});
}
//finish job
executorService.submit(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
whenFinish.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} else {
throw new IllegalStateException("had stated");
}
}
private synchronized Runnable getNextTask() {
if (tasks.hasNext()) {
return tasks.next();
}
return null;
}
public static class Builder {
private ExecutorService executorService;
private int initThreads = DEFAULT_THREADS;
private int maxConcurrentThread = DEFAULT_THREADS;
private Iterator<Runnable> tasks;
private Runnable whenFinish;
public Builder executorService(ExecutorService executor) {
this.executorService = executor;
return this;
}
public Builder initThreads(int initThreads) {
this.initThreads = initThreads;
return this;
}
public Builder maxConcurrentThread(int maxConcurrentThread) {
this.maxConcurrentThread = maxConcurrentThread;
return this;
}
public Builder tasks(Iterator<Runnable> tasks) {
this.tasks = tasks;
return this;
}
public Builder whenFinish(Runnable whenFinish) {
this.whenFinish = whenFinish;
return this;
}
public TaskControl build() {
TaskControl taskManag = new TaskControl();
if (tasks == null) {
throw new IllegalArgumentException("require tasks");
}
taskManag.tasks = tasks;
taskManag.whenFinish = whenFinish;
taskManag.maxConcurrentThread = maxConcurrentThread;
if (executorService == null) {
taskManag.executorService = Executors.newFixedThreadPool(initThreads);
} else {
taskManag.executorService = executorService;
}
return taskManag;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment