Skip to content

Instantly share code, notes, and snippets.

@iamthiago
Created March 28, 2017 17:16
Show Gist options
  • Save iamthiago/da648742696019603f6d2cdf79a9a8ac to your computer and use it in GitHub Desktop.
Save iamthiago/da648742696019603f6d2cdf79a9a8ac to your computer and use it in GitHub Desktop.
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 1
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
my-dispatcher-two {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
throughput = 512
}
package com.vivareal.columbus
import java.util.concurrent.CountDownLatch
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.routing.RoundRobinPool
import com.vivareal.columbus.infrastructure.ActorCreation
import scala.concurrent.duration.Duration
object CacheTest extends App {
val repeat = 1000000L
val system = ActorSystem("cache-system")
val single = system.actorOf(Props(classOf[CacheSingle], new CountDownLatch(1), repeat), "single")
//val round = system.actorOf(Props(classOf[CacheRoundRobin], new CountDownLatch(1), repeat), "round")
single ! Run
//round ! Run
}
class CacheRoundRobin(latch: CountDownLatch, repeat: Long) extends Actor with ActorLogging with ActorCreation {
var sent = 0L
var received = 0L
var startedTime = 0L
val cacheActor: ActorRef = createActor(RoundRobinPool(10).props(Props[CacheActor]), "cache-actor-round")
def receive = {
case Msg =>
received += 1
if (sent < repeat) {
cacheActor ! Msg
sent += 1
} else if (received >= repeat) {
latch.countDown()
val finishTime = System.currentTimeMillis()
log.info(s"Time in ${Duration.create(finishTime - startedTime, "millis")}")
}
case Run =>
startedTime = System.currentTimeMillis()
for (i <- 0L until repeat) {
cacheActor ! Msg
sent += 1
}
}
}
class CacheSingle(latch: CountDownLatch, repeat: Long) extends Actor with ActorLogging with ActorCreation {
var sent = 0L
var received = 0L
var startedTime = 0L
//val cacheActor: ActorRef = createActor(Props[CacheActor].withDispatcher("my-dispatcher"), "cache-actor-single")
val cacheActor: ActorRef = createActor(Props[CacheActor].withDispatcher("my-dispatcher-two"), "cache-actor-single")
def receive = {
case Msg =>
received += 1
if (sent < repeat) {
cacheActor ! Msg
sent += 1
} else if (received >= repeat) {
latch.countDown()
val finishTime = System.currentTimeMillis()
log.info(s"Time in ${Duration.create(finishTime - startedTime, "millis")}")
}
case Run =>
startedTime = System.currentTimeMillis()
for (i <- 0L until repeat) {
cacheActor ! Msg
sent += 1
}
}
}
class CacheActor extends Actor with ActorLogging {
def receive = {
case Msg =>
//log.info(s"received msg")
sender() ! Msg
}
}
case object Msg
case object Run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment