Skip to content

Instantly share code, notes, and snippets.

@krasserm
Created February 3, 2013 11:07

Revisions

  1. krasserm created this gist Feb 3, 2013.
    103 changes: 103 additions & 0 deletions parwrite.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,103 @@
    package example

    import scala.concurrent._
    import scala.concurrent.duration._

    import akka.actor._
    import akka.pattern.ask
    import akka.util.Timeout

    object Parwrite extends App {
    val system = ActorSystem("example")
    implicit val timeout = Timeout(5 seconds)

    import system.dispatcher

    val journal = system.actorOf(Props[Journal])
    val target = system.actorOf(Props(new Actor {
    def receive = {
    case msg => {
    println("received message " + msg)
    sender ! "re: " + msg
    }
    }
    }))

    1 to 100 foreach { i =>
    journal ? Write(i, target) onSuccess {
    case msg => //println("received reply " + msg)
    }
    }
    }

    case class Write(msg: Any, target: ActorRef, sequenceNr: Long = -1)
    case class Written(msg: Any, target: ActorRef)

    class Journal extends Actor {
    import context.dispatcher

    implicit val timeout = Timeout(5 seconds)

    val resequencer = context.actorOf(Props(new Resequencer))
    val writers = List.fill(5)(context.actorOf(Props(new Writer)))
    val writersCount = writers.length

    var counter = 0L

    def receive = {
    case write: Write => {
    counter += 1
    val ctr = counter
    val sdr = sender
    val idx = ctr % writersCount toInt

    val io = writers(idx) ? write.copy(sequenceNr = ctr)

    io onSuccess {
    case written: Written => {
    resequencer tell ((ctr, written), sdr)
    }
    }

    io onFailure {
    case thr => // wtf
    }
    }
    }
    }

    class Writer extends Actor {
    def receive = {
    case Write(msg, target, sequenceNr) => {
    // write to storage backend
    // ...
    // reply successful write
    sender ! Written(msg, target)
    }
    }
    }

    class Resequencer extends Actor {
    import scala.collection.mutable.Map

    private val delayed = Map.empty[Long, (Written, ActorRef)]
    private var delivered = 0L

    def receive = {
    case (seqnr: Long, written: Written) => resequence(seqnr, written, sender)
    }

    @scala.annotation.tailrec
    private def resequence(seqnr: Long, written: Written, sdr: ActorRef) {
    import written._

    if (seqnr == delivered + 1) {
    delivered = seqnr
    target tell (msg, sdr)
    } else {
    delayed += (seqnr -> (written, sender))
    }
    val eo = delayed.remove(delivered + 1)
    if (eo.isDefined) resequence(delivered + 1, eo.get._1, eo.get._2)
    }
    }