Skip to content

Instantly share code, notes, and snippets.

@pkmishra
Forked from casualjim/resque.scala
Created June 9, 2013 18:56

Revisions

  1. @casualjim casualjim created this gist Mar 17, 2011.
    390 changes: 390 additions & 0 deletions resque.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,390 @@
    package com.mojolly.backchat
    package redis
    package resque

    import com.mojolly.backchat.redis.resque.Resque.{ResqueWorkerActor}
    import net.liftweb.json._
    import JsonAST._
    import JsonDSL._
    import java.net.InetAddress
    import com.redis.ds.{ RedisDeque, RedisDequeClient }
    import com.redis.RedisClient
    import java.util.{ Date, GregorianCalendar, Calendar }
    import akka.dispatch.{ Dispatchers, HawtDispatcher }
    import akka.actor._
    import Actor._
    import com.mojolly.backchat.redis.RedisNamespace
    import actor.{ScheduledTask, Supervising, Supervised}
    import java.util.concurrent.{TimeUnit, ScheduledFuture, ConcurrentHashMap}

    object Resque {

    private object Meta {

    /*
    * For converting scala objects into DBObject values
    */
    object Reflection {

    /*
    * These don't require a conversion and can be put directly into a DBObject
    */
    val primitives = Set[Class[_]](classOf[String], classOf[Int], classOf[Long], classOf[Double],
    classOf[Float], classOf[Byte], classOf[BigInt], classOf[Boolean],
    classOf[Short], classOf[java.lang.Integer], classOf[java.lang.Long],
    classOf[java.lang.Double], classOf[java.lang.Float],
    classOf[java.lang.Byte], classOf[java.lang.Boolean],
    classOf[java.lang.Short])

    def primitive_?(clazz: Class[_]) = primitives contains clazz

    /*
    * This is used to convert DBObjects into JObjects
    */
    def primitive2jvalue(a: Any) = a match {
    case x: String => JString(x)
    case x: Int => JInt(x)
    case x: Long => JInt(x)
    case x: Double => JDouble(x)
    case x: Float => JDouble(x)
    case x: Byte => JInt(BigInt(x))
    case x: BigInt => JInt(x)
    case x: Boolean => JBool(x)
    case x: Short => JInt(BigInt(x))
    case x: java.lang.Integer => JInt(BigInt(x.asInstanceOf[Int]))
    case x: java.lang.Long => JInt(BigInt(x.asInstanceOf[Long]))
    case x: java.lang.Double => JDouble(x.asInstanceOf[Double])
    case x: java.lang.Float => JDouble(x.asInstanceOf[Float])
    case x: java.lang.Byte => JInt(BigInt(x.asInstanceOf[Byte]))
    case x: java.lang.Boolean => JBool(x.asInstanceOf[Boolean])
    case x: java.lang.Short => JInt(BigInt(x.asInstanceOf[Short]))
    case x if datetype_?(x.asInstanceOf[AnyRef].getClass) => datetype2jvalue(x)
    case _ => error("not a primitive " + a.asInstanceOf[AnyRef].getClass)
    }

    /*
    * Date types require formatting
    */
    val datetypes = Set[Class[_]](classOf[Calendar], classOf[Date], classOf[GregorianCalendar])

    def datetype_?(clazz: Class[_]) = datetypes contains clazz

    def datetype2jvalue(a: Any)(implicit formats: Formats) = a match {
    case x: Calendar => dateAsJValue(x.getTime)
    case x: Date => dateAsJValue(x)
    }

    def dateAsJValue(d: Date)(implicit formats: Formats) =
    JObject(JField("$dt", JString(formats.dateFormat.format(d))) :: Nil)
    }
    }

    implicit def namespaceToString(ns: RedisNamespace): String = ns.toString
    import messages._

    private var _resque: ActorRef = null
    private var _resqueSupervisor: ActorRef = null

    private def resque: ActorRef = {
    if (_resque.isNull) {
    _resque = addToSupervisor(actorOf(new Resque(RedisConfig())))
    }
    _resque
    }

    private def addToSupervisor(actor: ActorRef) = {
    if (_resqueSupervisor.isNull) {
    _resqueSupervisor = actorOf(new Actor with Supervising { }).start
    Runtime.getRuntime.addShutdownHook(new Thread {
    override def run = {
    Actor.registry.actorsFor(classOf[ResqueListener]) foreach { worker =>
    Option(_resque) foreach { _ ! StopWorker(worker) }
    }
    }
    })
    }
    actor.start
    _resqueSupervisor ! Link(actor)
    actor
    }

    def resque_=(res: ActorRef) = _resque = res

    def register[Worker <: ResqueWorker](worker: => Worker)(implicit mf: Manifest[Worker]){
    val (job, q) = getForQueue(worker)
    resque ! Reserve(q.name, job)
    }

    private def getForQueue[Worker <: ResqueWorker](worker: => Worker)(implicit mf: Manifest[Worker]) = {
    val a = actorOf(worker)
    val q = queueJobMap.get(Symbol(mf.erasure.getSimpleName))
    (a, q)
    }

    def enqueue[Worker <: ResqueWorker](worker: => Worker, args: Any*)(implicit mf: Manifest[Worker]) = {
    val (job, q) = getForQueue(worker)
    resque ! Enqueue(q.name, job.id, args.toList)
    }

    object Naming {

    object QueueName {
    def apply(name: String)(implicit resqueConfig: ResqueConfig) = {
    ( resqueConfig.namespace :: queue :: name :: Nil) mkString keySeparator
    }
    }

    object StatKey {

    def apply(statistic: String)(implicit resqueConfig: ResqueConfig) = {
    (resqueConfig.namespace :: stat :: statistic :: Nil) mkString keySeparator
    }

    def apply(statistic: String, workerId: String)(implicit resqueConfig: ResqueConfig) = {
    (resqueConfig.namespace :: stat :: statistic :: workerId :: Nil) mkString keySeparator
    }
    }

    object WorkerKey {
    def apply(workerId: String)(implicit resqueConfig: ResqueConfig) = {
    (resqueConfig.namespace :: worker :: workerId :: Nil) mkString keySeparator
    }
    }

    object WorkerId extends {
    def apply(pid: String, queueName: String) = {
    (hostName :: pid :: queueName :: Nil) mkString keySeparator
    }
    }

    object WorkerSet {
    def apply()(implicit resqueConfig: ResqueConfig) = (resqueConfig.namespace :: workers :: Nil) mkString keySeparator
    }


    }

    trait DefaultResqueConfig {
    implicit val resqueConfig = ResqueConfig()
    }

    private object messages {

    sealed trait ResqueMessage {
    def toJson = Serialization.write(this)
    }

    case object Poll extends ResqueMessage
    case class Perform(data: String) extends ResqueMessage
    case class Reserve(queue: String, worker: ActorRef) extends ResqueMessage
    case class StopWorker(worker: ActorRef) extends ResqueMessage
    case object StartResque extends ResqueMessage
    case class StartedWorker(worker: ActorRef) extends ResqueMessage
    case object Stop extends ResqueMessage
    case class Enqueue(queue: String, klass: String, args: List[Any]) extends ResqueMessage

    sealed trait JobResult

    case class JobFailure(
    payload: String, queue: String, worker: ActorRef,
    error: String, backtrace: List[String], failed_at: DateTime = DateTime.now) extends JobResult {
    def toJson = Serialization.write(this)
    }
    object JobFailure {
    def apply(payload: String, queue: String, worker: ActorRef, exception: Throwable): JobFailure =
    apply(payload, queue, worker, exception.getMessage, exception.getStackTrace.map(_.toString).toList)
    }
    object JobSuccess extends JobResult
    case class Success(worker: ActorRef) extends JobResult

    sealed trait JobProcessingMessage {
    def toJson = Serialization.write(this)
    }
    case class WorkingOn(worker: ActorRef, queue: String, payload: JValue, run_at: DateTime = DateTime.now) extends JobProcessingMessage
    }

    private val hawt = new HawtDispatcher

    class Resque(config: RedisConfig) extends Actor with DefaultResqueConfig with Supervising {

    self.dispatcher = hawt

    import Resque.Naming._
    protected val resqueClient = new RedisDequeClient(config.host, config.port)

    private lazy val redisActor = {
    val a = actorOf(new Actor {

    self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher

    protected val redis = new RedisClient(config.host, config.port)

    protected def receive = {
    case m: JobFailure => {
    val msg: JValue = ("failed_at" -> m.failed_at.toString(ISO8601_DATE)) ~
    ("payload" -> m.payload) ~
    ("error" -> m.error) ~
    ("backtrace" -> m.backtrace) ~
    ("worker" -> m.worker.id) ~
    ("queue" -> m.queue)
    redis.rpush("resque:failed", Printer.compact(render(msg)))
    }
    case Success(worker) => {
    redis.incr(StatKey("processed", worker.id))
    redis.incr(StatKey("processed"))
    redis.del(WorkerKey(worker.id))
    }
    case m @ WorkingOn(worker, queue, payload, runAt) => {
    val msg: JValue = ("queue" -> QueueName(queue)) ~ ("payload" -> payload) ~ ("run_at" -> runAt.toString(ISO8601_DATE))
    redis.set(WorkerKey(worker.id), Printer.compact(render(msg)))
    }
    case StopWorker(worker) => {
    redis.srem(WorkerSet(), worker.id)
    redis.del(WorkerKey(worker.id) + ":started")
    }
    case StartedWorker(worker) => {
    redis.sadd(WorkerSet(), worker.id)
    redis.set(WorkerKey(worker.id + ":started"), DateTime.now.toString(ISO8601_DATE))
    }
    case Enqueue(queueName, job, data) => {
    val json: JValue = ("class" -> job) ~ ("args" -> data.map(Meta.Reflection.primitive2jvalue _))
    log debug ("Enqueuing [%s] on [%s] with key [%s]:\n%s", job, queueName, QueueName(queueName), json.toPrettyJson)
    redis.sadd(ResqueNamespace("queues").toString, queueName)
    redis.rpush(QueueName(queueName), Printer.compact(render(json)) )

    }

    }
    })
    self startLink a
    a
    }

    override protected def receive = {
    case Reserve(queueName, worker) => {
    val queueListener = actorOf(new ResqueListener(queueName, resqueClient.getDeque(QueueName(queueName))))
    self startLink queueListener
    self.startLink(worker)
    queueListener ! Poll
    redisActor ! StartedWorker(queueListener)
    }
    case m: JobFailure => {
    redisActor ! m
    }
    case m: Success => {
    redisActor ! m
    }
    case m: WorkingOn => {
    redisActor ! m
    }
    case m @ StopWorker(worker) => {
    redisActor ! m
    self ! UnlinkAndStop(worker)
    }
    case m: StartedWorker => {
    redisActor ! m
    }
    case m: Enqueue => redisActor ! m
    case Stop => {
    self.shutdownLinkedActors
    self.supervisor foreach { _ ! UnlinkAndStop(self) }
    }
    }
    }

    val queueJobMap = new ConcurrentHashMap[Symbol, Symbol]

    private class ResqueListener(queue: String, redisDeque: RedisDeque[String]) extends Actor with Supervised {

    self.dispatcher = hawt

    protected val timeout = akka.util.duration.pairIntToDuration(250 -> TimeUnit.MILLISECONDS)

    import Naming.{ WorkerId}
    self.id = WorkerId(self.uuid.toString, queue)

    protected def schedulePoll = {
    currentScheduler = Scheduler.scheduleOnce(() => self ! Poll, timeout.length, timeout.unit)
    }

    protected var currentScheduler: ScheduledFuture[AnyRef] = null

    protected def receive = {
    case Poll => {
    val polled = redisDeque.pollFirst
    polled foreach { x =>
    val json = JsonParser.parse(x)
    val jn = (json \ "class").extract[String]
    val actors = Actor.registry.actorsFor(jn)
    val args = Perform(x)
    actors foreach { _ ! args }
    self.supervisor foreach { _ ! WorkingOn(self, queue, json)}
    }
    if (polled.isEmpty) schedulePoll
    }
    case f: JobFailure => {
    self.supervisor foreach { _ ! f.copy(worker = self)}
    self ! Poll
    }
    case s: Success => {
    self.supervisor foreach { _ ! s.copy(worker = self)}
    self ! Poll
    }
    }


    override def postStop = {
    if(currentScheduler != null && !(currentScheduler.isCancelled || currentScheduler.isDone)) currentScheduler.cancel(false)
    }
    }


    lazy val hostName = InetAddress.getLocalHost.getHostName
    private val queue = "queue".intern
    private val stat = "stat".intern
    private val worker = "worker".intern
    private val keySeparator = ":".intern
    private val queueSeparator = "$".intern
    private val workers = "workers".intern
    private val payload = "payload".intern

    private[resque] trait ResqueWorkerActor extends Actor with Supervised {

    protected val queue: Symbol
    self.id = getClass.getSimpleName

    final protected def receive = {
    case Perform(data) => {
    log info ("Worker [%s] received:\n%s", self.id, data)
    try {
    val json = JsonParser.parse(data)
    perform((json \ "args").children.head.values.asInstanceOf[List[Any]]:_ *)
    notifyOther(Success(self))
    } catch {
    case e => {
    notifyOther(JobFailure(data, queue.name, self, e))
    log.error(e, "There was an error in job: %s", self.id)
    }
    }
    }
    }

    private def notifyOther[T](msg: T) {
    self.sender foreach { _ ! msg }
    }

    protected def perform(data: Any*): Unit

    }
    }

    object ResqueNamespace extends RedisNamespace("resque")


    case class ResqueConfig(namespace: String)
    object ResqueConfig { def apply(): ResqueConfig = apply("resque") }

    abstract class ResqueWorker(protected val queue: Symbol) extends ResqueWorkerActor {
    Resque.queueJobMap.put(Symbol(self.id), queue)
    }
    case class RedisConfig(host: String = "localhost", port: Int = 6379)