Created
April 15, 2017 00:37
-
-
Save juanavelez/bf47ae84f842896350ca046a1d88ac5f to your computer and use it in GitHub Desktop.
execute-blocking-code-multiple-queues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Similar to {@link #executeBlocking(Handler, Handler)} but when this method is called several times on the same | |
* context for the same {@code identifier}, executions associated to that value will be executed serially. | |
* However executions associated to different identifiers will be executed in parallel. | |
* @param identifier Object used to group and serialize executions | |
* @param blockingCodeHandler handler representing the blocking code to run | |
* @param resultHandler handler that will be called when the blocking code is complete | |
* @param <T> the type of the result | |
*/ | |
<T> void executeBlocking(Object identifier, Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
protected final Map<Object, TaskQueue> orderedTaskQueues = new WeakHashMap<>(); | |
@Override | |
public <T> void executeBlocking(Object identifier, Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) { | |
TaskQueue queue = orderedTaskQueues.get(identifier); | |
if (queue == null) { | |
queue = new TaskQueue(); | |
orderedTaskQueues.put(identifier, queue); | |
} | |
executeBlocking(blockingCodeHandler, queue, resultHandler); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testExecuteBlockingMultipleQueuesViaContextCall() throws Exception { | |
ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); | |
waitFor(4); | |
CountDownLatch latch1 = new CountDownLatch(1); | |
CountDownLatch latch2 = new CountDownLatch(1); | |
CountDownLatch latch3 = new CountDownLatch(1); | |
CountDownLatch latch4 = new CountDownLatch(1); | |
String queue1 = "queue1"; | |
String queue2 = "queue2"; | |
context.executeBlocking(queue1, fut -> { | |
try { | |
awaitLatch(latch3); | |
latch1.countDown(); | |
fut.complete(); | |
} catch (Exception e) { | |
fut.fail(e); | |
} | |
}, ar -> { | |
assertTrue(ar.succeeded()); | |
complete(); | |
}); | |
context.executeBlocking(queue1, fut -> { | |
try { | |
latch1.await(); | |
latch2.countDown(); | |
fut.complete(); | |
} catch (Exception e) { | |
fut.fail(e); | |
} | |
}, ar -> { | |
assertTrue(ar.succeeded()); | |
complete(); | |
}); | |
context.executeBlocking(queue2, fut -> { | |
latch3.countDown(); | |
fut.complete(); | |
}, ar -> { | |
assertTrue(ar.succeeded()); | |
complete(); | |
}); | |
context.executeBlocking(queue2, fut -> { | |
try { | |
latch3.await(); | |
latch4.countDown(); | |
fut.complete(); | |
} catch (Exception e) { | |
fut.fail(e); | |
} | |
}, ar -> { | |
assertTrue(ar.succeeded()); | |
complete(); | |
}); | |
await(); | |
} | |
@Test | |
public void testExecuteBlockingMultipleQueuesViaVertxCall() throws Exception { | |
waitFor(4); | |
vertx.deployVerticle(new AbstractVerticle() { | |
@Override | |
public void start() throws Exception { | |
CountDownLatch latch1 = new CountDownLatch(1); | |
CountDownLatch latch2 = new CountDownLatch(1); | |
CountDownLatch latch3 = new CountDownLatch(1); | |
CountDownLatch latch4 = new CountDownLatch(1); | |
String queue1 = "queue1"; | |
String queue2 = "queue2"; | |
vertx.executeBlocking(queue1, fut -> { | |
try { | |
awaitLatch(latch3); | |
latch1.countDown(); | |
fut.complete(); | |
} catch (Exception e) { | |
fut.fail(e); | |
} | |
}, ar -> { | |
assertTrue(ar.succeeded()); | |
complete(); | |
}); | |
vertx.executeBlocking(queue1, fut -> { | |
try { | |
latch1.await(); | |
latch2.countDown(); | |
fut.complete(); | |
} catch (Exception e) { | |
fut.fail(e); | |
} | |
}, ar -> { | |
assertTrue(ar.succeeded()); | |
complete(); | |
}); | |
vertx.executeBlocking(queue2, fut -> { | |
try { | |
latch3.countDown(); | |
fut.complete(); | |
} catch (Exception e) { | |
fut.fail(e); | |
} | |
}, ar -> { | |
assertTrue(ar.succeeded()); | |
complete(); | |
}); | |
vertx.executeBlocking(queue2, fut -> { | |
try { | |
latch3.await(); | |
latch4.countDown(); | |
fut.complete(); | |
} catch (Exception e) { | |
fut.fail(e); | |
} | |
}, ar -> { | |
assertTrue(ar.succeeded()); | |
complete(); | |
}); | |
} | |
}); | |
await(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Similar to {@link #executeBlocking(Handler, Handler)} but when this method is called several times on the same | |
* context for the same {@code identifier}, executions associated to that value will be executed serially. | |
* However executions associated to different identifiers will be executed in parallel. | |
* @param identifier Object used to group and serialize executions | |
* @param blockingCodeHandler handler representing the blocking code to run | |
* @param resultHandler handler that will be called when the blocking code is complete | |
* @param <T> the type of the result | |
*/ | |
<T> void executeBlocking(Object identifier, Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
public <T> void executeBlocking(Object identifier, | |
Handler<Future<T>> blockingCodeHandler, | |
Handler<AsyncResult<T>> asyncResultHandler) { | |
ContextImpl context = getOrCreateContext(); | |
context.executeBlocking(identifier, blockingCodeHandler, asyncResultHandler); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment