Last active
October 15, 2018 09:29
-
-
Save mgodave/7469931 to your computer and use it in GitHub Desktop.
Kotlin actor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import com.google.common.util.concurrent.ListenableFuture | |
import com.google.common.util.concurrent.SettableFuture | |
import com.google.common.util.concurrent.MoreExecutors | |
import java.util.concurrent.Executors | |
import java.util.ArrayList | |
import java.util.concurrent.LinkedBlockingQueue | |
import com.google.common.util.concurrent.ListeningExecutorService | |
import java.util.concurrent.ThreadFactory | |
import com.google.common.util.concurrent.ThreadFactoryBuilder | |
import futures.* | |
import java.util.concurrent.atomic.AtomicBoolean | |
object Actors { | |
private val threadFactory: ThreadFactory? = ThreadFactoryBuilder().setDaemon(true)?.build() | |
private val pool = Executors.newCachedThreadPool(threadFactory as ThreadFactory) | |
public val executor: ListeningExecutorService = MoreExecutors.listeningDecorator(pool) as ListeningExecutorService; | |
} | |
inline fun <V> ListeningExecutorService.submit(f: () -> V): ListenableFuture<V> = submit(callable(f)) | |
abstract class Actor() { | |
private data class Message(val m: Any?, val p: SettableFuture<Any>) | |
private val running = AtomicBoolean(false) | |
private val mailbox = LinkedBlockingQueue<Message>() | |
abstract fun receive(m: Any?): Any? | |
fun send(m: Any?): ListenableFuture<Any> { | |
val result = promise<Any>() | |
mailbox.put(Message(m, result)) | |
dispatch() | |
return result | |
} | |
private fun dispatch() { | |
if (!running.get() && !mailbox.isEmpty()) { | |
if (running.compareAndSet(false, true)) { | |
val messages = ArrayList<Message>() | |
val num = mailbox.drainTo(messages, 10) | |
println(num) | |
Actors.executor submit { | |
for (msg in messages) { | |
try { | |
msg.p.set(receive(msg.m)) | |
} catch (t: Throwable) { | |
msg.p.setException(t) | |
} | |
} | |
running.set(false) | |
dispatch() | |
} | |
} | |
} | |
} | |
} | |
fun actor(f: (Any?) -> Any?): Actor { | |
return object: Actor() { | |
override fun receive(m: Any?): Any? { | |
return f(m) | |
} | |
} | |
} | |
data class Stuff(val i: Int) | |
fun ringActor(next: Actor? = null): Actor { | |
return actor { m -> | |
when (next) { | |
null -> println(m) | |
else -> next send m | |
} | |
} | |
} | |
fun runRing() { | |
val last = ringActor() | |
var first: Actor = last | |
for (i in 1..1000) { | |
first = ringActor(first) | |
} | |
for (i in 1..100) { | |
first send i | |
} | |
} | |
data class PingMessage(val a: Actor) | |
data class PongMessage(val a: Actor) | |
object Ping: Actor() { | |
override fun receive(m: Any?): Unit { | |
when (m) { | |
is PongMessage -> { | |
println("Pong") | |
m.a send PingMessage(this) | |
} | |
else -> throw IllegalStateException() | |
} | |
} | |
} | |
object Pong: Actor() { | |
override fun receive(m: Any?): Unit { | |
when (m) { | |
is PingMessage -> { | |
println("Ping") | |
m.a send PongMessage(this) | |
} | |
else -> throw IllegalStateException() | |
} | |
} | |
} | |
fun runPingPong() { | |
Pong send PingMessage(Ping) | |
Thread.sleep(10000) | |
} | |
fun main(args: Array<String>): Unit { | |
val a = actor { | |
when (it) { | |
is Stuff -> "Stuff ${it.i}" | |
else -> "Nothing" | |
} | |
} | |
val f = traverse(1..100) {(i: Int) -> a send Stuff(i) } | |
f andThen { | |
val results = it as Iterable<Any?> | |
results.forEach { println(it) } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment