Created
November 24, 2015 20:09
-
-
Save jasontedor/050e7f33e9c12d40ac5f to your computer and use it in GitHub Desktop.
InternalClusterService.java synchronization
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java | |
index 3407a57..73d014e 100644 | |
--- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java | |
+++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java | |
@@ -90,7 +90,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe | |
private final Collection<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<>(); | |
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>(); | |
private final Collection<ClusterStateListener> lastClusterStateListeners = new CopyOnWriteArrayList<>(); | |
- private final Map<ClusterStateTaskExecutor, List<UpdateTask>> updateTasksPerExecutor = new HashMap<>(); | |
+ private final ConcurrentMap<ClusterStateTaskExecutor, List<UpdateTask>> updateTasksPerExecutor = new ConcurrentHashMap<>(); | |
// TODO this is rather frequently changing I guess a Synced Set would be better here and a dedicated remove API | |
private final Collection<ClusterStateListener> postAppliedListeners = new CopyOnWriteArrayList<>(); | |
private final Iterable<ClusterStateListener> preAppliedListeners = Iterables.concat(priorityClusterStateListeners, clusterStateListeners, lastClusterStateListeners); | |
@@ -285,9 +285,10 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe | |
try { | |
final UpdateTask<T> updateTask = new UpdateTask<>(source, task, config, executor, listener); | |
- synchronized (updateTasksPerExecutor) { | |
- updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()).add(updateTask); | |
- } | |
+ updateTasksPerExecutor.merge( | |
+ executor, | |
+ new ArrayList<>(Arrays.asList(updateTask)), | |
+ (oldValue, value) -> { List<UpdateTask> newValue = new ArrayList<>(oldValue); newValue.addAll(value); return newValue; }); | |
if (config.timeout() != null) { | |
updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), new Runnable() { | |
@@ -374,21 +375,20 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe | |
<T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) { | |
final ArrayList<UpdateTask<T>> toExecute = new ArrayList<>(); | |
final ArrayList<String> sources = new ArrayList<>(); | |
- synchronized (updateTasksPerExecutor) { | |
- List<UpdateTask> pending = updateTasksPerExecutor.remove(executor); | |
- if (pending != null) { | |
- for (Iterator<UpdateTask> iter = pending.iterator(); iter.hasNext(); ) { | |
- UpdateTask task = iter.next(); | |
- if (task.processed.getAndSet(true) == false) { | |
- logger.trace("will process [{}]", task.source); | |
- toExecute.add((UpdateTask<T>) task); | |
- sources.add(task.source); | |
- } else { | |
- logger.trace("skipping [{}], already processed", task.source); | |
- } | |
+ List<UpdateTask> pending = updateTasksPerExecutor.remove(executor); | |
+ if (pending != null) { | |
+ for (Iterator<UpdateTask> iter = pending.iterator(); iter.hasNext(); ) { | |
+ UpdateTask task = iter.next(); | |
+ if (task.processed.getAndSet(true) == false) { | |
+ logger.trace("will process [{}]", task.source); | |
+ toExecute.add((UpdateTask<T>) task); | |
+ sources.add(task.source); | |
+ } else { | |
+ logger.trace("skipping [{}], already processed", task.source); | |
} | |
} | |
} | |
+ | |
if (toExecute.isEmpty()) { | |
return; | |
} | |
(END) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment