Skip to content

Instantly share code, notes, and snippets.

@kunalgrover05
Created February 17, 2017 18:08
Show Gist options
  • Save kunalgrover05/b960643679a4417eac0db240d8b6f352 to your computer and use it in GitHub Desktop.
Save kunalgrover05/b960643679a4417eac0db240d8b6f352 to your computer and use it in GitHub Desktop.
Example function for doing retries with Java8 CompletableFuture API
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
/**
* A class which implements a Retry API for a Supplier function asynchronously on a new executor
* A retry is triggered when any of the exceptions which are specified is thrown, and retryCount is lower
* than MAX_RETRIES
* Created by grokunal on 2/17/17.
*/
public class Retry {
private static final int MAX_RETRIES = 3;
// Result returned when RejectedExecutionException is thrown or retries equal MAX_RETRIES
private static final Optional<Integer> REJECTED_RESULT = Optional.of(-1);
/**
* @param supplier A supplier which is a method which returns a result
* @param handleExceptions List of exception classes on which retry should happen
* @param retryCount The current retryValue. 0 means the first invocation
* @param executorService The executor to execute this on
*/
static CompletableFuture<Optional<Integer>> doRetries(Supplier<Integer> supplier,
List<Class<? extends Throwable>> handleExceptions, int retryCount, Executor executorService) {
if (retryCount > MAX_RETRIES) {
return CompletableFuture.completedFuture(REJECTED_RESULT);
}
// Will catch all exceptions in handleExceptions list + RejectedExecutionException,
// rest everything will returned
// Why marker here? If we directly use CompletableFuture.supplyAsync, then the exception
// is thrown on the thread invoking that. So you would need a try/catch block to handle it
CompletableFuture<Object> intermediateFuture = CompletableFuture.completedFuture("Marker")
.thenApplyAsync(x -> supplier.get(), executorService)
.handle((x, ex) -> {
if (ex != null) {
Throwable exCause = ex.getCause();
if (exCause instanceof RejectedExecutionException ||
handleExceptions.contains(exCause.getClass())) {
return Optional.empty();
}
// Return the exception otherwise
return exCause;
} else {
return Optional.of(x);
}
});
return intermediateFuture.thenCompose(x -> {
CompletableFuture<Optional<Integer>> future;
if (x instanceof Throwable) {
// Exception thrown from intermediateFuture, return that directly.
future = new CompletableFuture<>();
future.completeExceptionally((Throwable) x);
return future;
} else {
// Assuming we didn't create any other object type above
Optional<Integer> res = (Optional<Integer>) x;
if (res.isPresent()) {
return CompletableFuture.completedFuture(res);
} else {
return doRetries(supplier, handleExceptions, retryCount + 1, executorService);
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment