Last active
April 5, 2020 17:43
-
-
Save terrybleger/14aa13917d47f5bd8d6c to your computer and use it in GitHub Desktop.
Vertx - Quasar - Kotlin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package util.func | |
import co.paralleluniverse.fibers.* | |
import co.paralleluniverse.strands.SuspendableCallable | |
import co.paralleluniverse.strands.SuspendableRunnable | |
import io.vertx.core.Context | |
import io.vertx.core.Future | |
import io.vertx.core.Vertx | |
import java.util.concurrent.Executor | |
Suspendable | |
fun async(block: AsyncTask.() -> Unit) { | |
AsyncTask().block() | |
} | |
Suspendable | |
fun async(vertx: Vertx, block: VertxAsyncTask.() -> Unit) { | |
VertxAsyncTask(vertx).block() | |
} | |
Suspendable | |
fun yield<T : Any?>(future: Future<T>): T { | |
return Fiber(FutureCallable(future)).start().get() | |
} | |
Suspendable | |
fun yield<T : Any?>(vertx: Vertx, future: Future<T>): T { | |
return FiberFactory.create(vertx, future).start().get() | |
} | |
Suspendable | |
fun block(vertx: Vertx, callback: () -> Unit) { | |
FiberFactory.create(vertx, SuspendableRunner { callback() }).start() | |
} | |
open class AsyncTask { | |
Suspendable | |
open fun await<T : Any?>(future: Future<T>): T { | |
return Fiber(FutureCallable(future)).start().get() | |
} | |
} | |
class VertxAsyncTask(val vertx: Vertx) : AsyncTask() { | |
Suspendable | |
override fun await<T : Any?>(future: Future<T>): T { | |
return FiberFactory.create(vertx, FutureCallable(future)).start().get() | |
} | |
} | |
class ContextExecutor(val context: Context) : Executor { | |
override fun execute(command: Runnable) { | |
context.runOnContext { command.run() } | |
} | |
} | |
class FiberFactory { | |
companion object { | |
private fun createScheduler(vertx: Vertx): FiberScheduler { | |
return FiberExecutorScheduler("vertx", ContextExecutor(vertx.getOrCreateContext())) | |
} | |
fun create<T : Any?>(vertx: Vertx, future: Future<T>): Fiber<T> { | |
return create(vertx, FutureCallable(future)) | |
} | |
fun create<T : Any?>(vertx: Vertx, callable: FutureCallable<T>): Fiber<T> { | |
return FutureFiber(createScheduler(vertx), callable) | |
} | |
fun create(vertx: Vertx, runner: SuspendableRunner): Fiber<Unit> { | |
return Fiber(createScheduler(vertx), runner) | |
} | |
} | |
} | |
class FutureFiber<T : Any?>(scheduler: FiberScheduler, callable: FutureCallable<T>) : Fiber<T>(scheduler, callable) | |
class FutureCallable<T : Any?>(private val future: Future<T>) : SuspendableCallable<T> { | |
Suspendable | |
override fun run(): T { | |
return AsyncHandler(future).run() | |
} | |
} | |
class AsyncHandler<T : Any?>(private val future: Future<T>) : FiberAsync<T, Throwable>() { | |
override fun requestAsync() { | |
future.setHandler { | |
if (it.succeeded()) asyncCompleted(it.result()) | |
else asyncFailed(it.cause()) | |
} | |
} | |
} | |
class SuspendableRunner(private val callback: SuspendableRunner.() -> Unit) : SuspendableRunnable { | |
Suspendable | |
override fun run() { | |
callback() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import co.paralleluniverse.fibers.Suspendable | |
import co.paralleluniverse.strands.dataflow.Var | |
import util.func.async | |
import util.func.block | |
import util.func.yield | |
import io.vertx.core.Vertx | |
fun main(args: Array<String>) { | |
val vertx = Vertx.vertx() | |
// every function that is blocking must be ran from inside this one. | |
// and every function that is blocking must a @Suspendable | |
block(vertx) { example(vertx) } | |
} | |
Suspendable | |
fun example(vertx: Vertx) { | |
// testing vars | |
val v = Var<Int>() | |
vertx.setTimer(1000, { v.set(1) }) | |
println(v.get()) | |
// testing async | |
async { | |
println( await(simple(1)) ) | |
println( await(simple(2)) ) | |
println( await(simple(3)) ) | |
} | |
// testing yield | |
println( yield(simple(1)) ) | |
println( yield(simple(2)) ) | |
println( yield(simple(3)) ) | |
} | |
fun simple<T: Any?>(value: T): Future<T> { | |
val future = Future.future<T>() | |
future.complete(value) | |
return future | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# add this to Run/Debug Configurations inside VM Options: | |
-javaagent:libs/quasar-core-0.7.2-jdk8.jar -Dco.paralleluniverse.fibers.verifyInstrumentation=true | |
# set verifyInstrumentation=true only when development | |
# this helps you to know when to put @Suspendable on your functions | |
-Dco.paralleluniverse.fibers.verifyInstrumentation=true |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment