Last active
May 19, 2016 03:53
-
-
Save marccarre/b136e1e488fc52da58d102df057265b1 to your computer and use it in GitHub Desktop.
Asynchronously synchronise and aggregate intermediate results
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.marccarre; | |
import java.util.Arrays; | |
import java.util.Comparator; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.stream.Stream; | |
import static java.util.concurrent.CompletableFuture.completedFuture; | |
import static java.util.concurrent.TimeUnit.SECONDS; | |
public class AsynchronousSumAndMax { | |
public static void main(final String[] args) { | |
stopwatch(() -> { | |
Stream<CompletableFuture<Integer>> xs = Stream.of(1, 2, 3, 4, 5).map( | |
x -> CompletableFuture.supplyAsync(() -> compute(x)) | |
); | |
CompletableFuture<Integer> sum = xs.reduce(completedFuture(0), (x, y) -> x.thenCombine(y, (i, j) -> i + j)); | |
CompletableFuture<Integer>[] ys = Stream.of(1, 2, 3).map( | |
x -> sum.thenApplyAsync(s -> multiply(s, x)) | |
).toArray(CompletableFuture[]::new); | |
CompletableFuture<Integer> max = CompletableFuture.allOf(ys).thenApply( | |
(Void) -> Arrays.stream(ys) | |
.map(y -> y.getNow(Integer.MAX_VALUE)) // Guaranteed to return y's value, given we synchronise with allOf, and only thenApply this function. | |
.max(Comparator.naturalOrder()) | |
.get() | |
); | |
// Block and wait for results (avoid this in production code!): | |
try { println("Result: " + max.get()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } | |
// Output: | |
// [ForkJoinPool.commonPool-worker-1]: Computing 1... | |
// [ForkJoinPool.commonPool-worker-2]: Computing 2... | |
// [ForkJoinPool.commonPool-worker-4]: Computing 4... | |
// [ForkJoinPool.commonPool-worker-3]: Computing 3... | |
// [ForkJoinPool.commonPool-worker-5]: Computing 5... | |
// [ForkJoinPool.commonPool-worker-1]: Computed 1. | |
// [ForkJoinPool.commonPool-worker-5]: Computed 5. | |
// [ForkJoinPool.commonPool-worker-4]: Computed 4. | |
// [ForkJoinPool.commonPool-worker-2]: Computed 2. | |
// [ForkJoinPool.commonPool-worker-3]: Computed 3. | |
// [ForkJoinPool.commonPool-worker-2]: Computing 15 * 1... | |
// [ForkJoinPool.commonPool-worker-4]: Computing 15 * 2... | |
// [ForkJoinPool.commonPool-worker-5]: Computing 15 * 3... | |
// [ForkJoinPool.commonPool-worker-4]: Computed 15 * 2 = 30. | |
// [ForkJoinPool.commonPool-worker-5]: Computed 15 * 3 = 45. | |
// [ForkJoinPool.commonPool-worker-2]: Computed 15 * 1 = 15. | |
// [main]: Result: 45 | |
// Elapsed time: 4 seconds. | |
}); | |
} | |
private static int compute(final int x) { | |
println("Computing " + x + "..."); | |
sleep(2, SECONDS); | |
println("Computed " + x + "."); | |
return x; | |
} | |
private static int multiply(final int x, final int y) { | |
println("Computing " + x + " * " + y + "..."); | |
sleep(2, SECONDS); | |
final int r = x * y; | |
println("Computed " + x + " * " + y + " = " + r + "."); | |
return r; | |
} | |
private static void sleep(final int duration, final TimeUnit unit) { | |
try { unit.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } | |
} | |
private static void println(final String message) { | |
System.out.println("[" + Thread.currentThread().getName() + "]: " + message); | |
} | |
private static void stopwatch(final Runnable action) { | |
final long begin = System.currentTimeMillis(); | |
action.run(); | |
System.out.println("Elapsed time: " + (System.currentTimeMillis() - begin) / 1000 + " seconds."); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment