Last active
October 8, 2023 01:59
-
-
Save vthacker/5d153de1349eee96683d1dd18824a7b2 to your computer and use it in GitHub Desktop.
Fan out queries and aggregate results. Some queries can take too long and some might throw exception.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class StructuredConcurrencyTest { | |
public static void main(String[] args) { | |
Test test = new Test(); | |
test.query(); | |
} | |
} | |
class Test { | |
public void query() { | |
try (CollectingScope<String> scope = new CollectingScope<>()) { | |
scope.fork(() -> getQuery("query1", -1)); | |
// this query should timeout since it takes too long | |
scope.fork(() -> getQuery("query2", 50)); | |
// this query is hardcoded to throw an exception | |
scope.fork(() -> getQuery("query3", -1)); | |
scope.fork(() -> getQuery("query4", -1)); | |
try { | |
scope.joinUntil(Instant.now().plusMillis(10)); | |
} catch (TimeoutException e) { | |
System.out.println("Some queries took to long so we cancelled it"); | |
} | |
// The ordering is based on whichever task completes first | |
String results = scope.completedSuccessfully() | |
.collect(Collectors.joining(", ", "{ ", " }")); | |
System.out.println(results); | |
} catch (Exception e) { | |
System.out.println(e); | |
} | |
} | |
public String getQuery(String query, long waitForMs) { | |
if (waitForMs > 0) { | |
try { | |
Thread.sleep(waitForMs); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
if ("query3".equals(query)) { | |
throw new IllegalArgumentException("Wrong query parameters"); | |
} | |
return "result_" + query; | |
} | |
} | |
class CollectingScope<T> extends StructuredTaskScope<T> { | |
private final Queue<T> subtasks = new ConcurrentLinkedQueue<>(); | |
@Override | |
protected void handleComplete(Subtask<? extends T> subtask) { | |
if (subtask.state() == Subtask.State.SUCCESS) { | |
subtasks.add(subtask.get()); | |
} | |
} | |
@Override | |
public CollectingScope<T> join() throws InterruptedException { | |
super.join(); | |
return this; | |
} | |
public Stream<T> completedSuccessfully() { | |
// If we call ensureOwnerAndJoined we get a IllegalStateException | |
// The reason being on timeout lastJoinCompleted does not get updated by design | |
// TODO: But I can't figure out a way to get the successful results otherwise | |
// super.ensureOwnerAndJoined(); | |
return subtasks.stream(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment