Skip to content

Instantly share code, notes, and snippets.

@ericacm
Last active August 29, 2015 13:56

Revisions

  1. ericacm revised this gist Apr 11, 2014. 1 changed file with 28 additions and 21 deletions.
    49 changes: 28 additions & 21 deletions RemoteActorResolver.scala
    Original file line number Diff line number Diff line change
    @@ -4,31 +4,33 @@ import akka.actor._
    import concurrent.duration._
    import util.control.NoStackTrace

    /**
    * NOTE: `RemoteActorResolver` has been superseded by `actorSelection.resolveOne`
    */
    object RemoteActorResolver {
    val resolverCount = new AtomicInteger(0)

    type LookupMap = Map[ActorPath, Promise[ActorRef]]

    class ActorResolutionException(message: String) extends Exception(message) with NoStackTrace

    def props(lookup: LookupMap, timeout: Duration): Props = Props(classOf[RemoteActorResolver], lookup, timeout)
    def props(lookup: LookupMap, timeout: Duration, enableNoMatch: Boolean): Props =
    Props(classOf[RemoteActorResolver], lookup, timeout, enableNoMatch)

    /**
    * Start resolving ActorPaths to ActorRefs
    *
    * @param factory `ActorContext` or `ActorSystem`
    * @param paths Collection of `ActorPaths` to resolve
    * @param timeout Timeout after which promises will be failed
    * @return Map of `ActorPath` to `Promise[ActorRef]` where the `ActorPaths` are the `paths`
    * and the promises will be completed when the paths are resolved (or
    * failed if the timeout is reached).
    * @param factory `ActorContext` or `ActorSystem`
    * @param paths Collection of `ActorPaths` to resolve
    * @param timeout Timeout after which promises will be failed
    * @param enableNoMatch If `true` then receiving `ActorIdentity(path, None)` will cause resolution
    * for the path to fail. Otherwise it is ignored and only timeouts cause lookup
    * failures.
    * @return Map of `ActorPath` to `Promise[ActorRef]` where the `ActorPaths` are
    * the `paths` and the promises will be completed when the paths are
    * resolved (or failed if the timeout is reached).
    */
    def startResolution(factory: ActorRefFactory, paths: Seq[ActorPath], timeout: Duration = 5.minutes): LookupMap = {
    def startResolution(factory: ActorRefFactory, paths: Seq[ActorPath], timeout: Duration = 5.minutes,
    enableNoMatch: Boolean = false): LookupMap = {
    val lookup = paths.map(_ -> Promise[ActorRef]()).toMap
    factory.actorOf(props(lookup, timeout), "remoteActorResolver" + resolverCount.incrementAndGet())
    factory.actorOf(props(lookup, timeout, enableNoMatch), "remoteActorResolver" + resolverCount.incrementAndGet())
    lookup
    }

    @@ -46,15 +48,20 @@ object RemoteActorResolver {
    * Using `Identify`/`ActorIdentity`, try to resolve all of the `ActorPath`s in lookup map.
    * As each `ActorPath` is successfully or unsuccessfully resolved its corresponding `Promise`
    * is completed or failed, respectively.
    *
    * Stops self after the last path is resolved.
    *
    * @param lookup Prepopulated map of `ActorPath` to `Promise[ActorRef]`
    * @param timeout After timeout all promises corresponding to unresolved paths will be failed and
    * this actor will stop itself.
    * Note: Ignores the case where `ActorIdentity(_, None)` is received. This is because the actor might
    * not be created on the target system yet. TODO: maybe handle it optionally.
    *
    * @param lookup Prepopulated map of `ActorPath` to `Promise[ActorRef]`
    * @param timeout After timeout all promises corresponding to unresolved paths will be failed and
    * this actor will stop itself.
    * @param enableNoMatch If `true` then receiving `ActorIdentity(path, None)` will cause resolution
    * for the path to fail. Otherwise it is ignored and only timeouts cause lookup
    * failures.
    */
    class RemoteActorResolver(lookup: LookupMap, timeout: Duration) extends Actor with ActorLogging {
    import RemoteActorResolver._

    class RemoteActorResolver(lookup: LookupMap, timeout: Duration, enableNoMatch: Boolean) extends Actor with ActorLogging {
    val cycleTimeout = 5.seconds
    var elapsed = Duration.Zero

    @@ -64,17 +71,17 @@ class RemoteActorResolver(lookup: LookupMap, timeout: Duration) extends Actor wi
    def sendIdentify() = lookup foreach { case (path, promise) =>
    if (!promise.isCompleted) context.system.actorSelection(path) ! Identify(path)
    }

    def stopIfFinished(): Unit = if (lookup.values.forall(_.isCompleted)) context stop self

    def exception(message: String) = new ActorResolutionException(message)

    def receive = {
    case ActorIdentity(path: ActorPath, Some(ref)) =>
    lookup(path) trySuccess ref
    stopIfFinished()

    case ActorIdentity(path: ActorPath, None) =>
    case ActorIdentity(path: ActorPath, None) if enableNoMatch =>
    lookup(path) tryFailure exception(s"No actor exists at $path")
    stopIfFinished()

  2. ericacm revised this gist Mar 4, 2014. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion RemoteActorResolver.scala
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,3 @@

    import java.util.concurrent.atomic.AtomicInteger
    import concurrent.{Future, Promise}
    import akka.actor._
    @@ -54,6 +53,8 @@ object RemoteActorResolver {
    * this actor will stop itself.
    */
    class RemoteActorResolver(lookup: LookupMap, timeout: Duration) extends Actor with ActorLogging {
    import RemoteActorResolver._

    val cycleTimeout = 5.seconds
    var elapsed = Duration.Zero

  3. ericacm created this gist Mar 4, 2014.
    91 changes: 91 additions & 0 deletions RemoteActorResolver.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,91 @@

    import java.util.concurrent.atomic.AtomicInteger
    import concurrent.{Future, Promise}
    import akka.actor._
    import concurrent.duration._
    import util.control.NoStackTrace

    /**
    * NOTE: `RemoteActorResolver` has been superseded by `actorSelection.resolveOne`
    */
    object RemoteActorResolver {
    val resolverCount = new AtomicInteger(0)

    type LookupMap = Map[ActorPath, Promise[ActorRef]]

    class ActorResolutionException(message: String) extends Exception(message) with NoStackTrace

    def props(lookup: LookupMap, timeout: Duration): Props = Props(classOf[RemoteActorResolver], lookup, timeout)

    /**
    * Start resolving ActorPaths to ActorRefs
    *
    * @param factory `ActorContext` or `ActorSystem`
    * @param paths Collection of `ActorPaths` to resolve
    * @param timeout Timeout after which promises will be failed
    * @return Map of `ActorPath` to `Promise[ActorRef]` where the `ActorPaths` are the `paths`
    * and the promises will be completed when the paths are resolved (or
    * failed if the timeout is reached).
    */
    def startResolution(factory: ActorRefFactory, paths: Seq[ActorPath], timeout: Duration = 5.minutes): LookupMap = {
    val lookup = paths.map(_ -> Promise[ActorRef]()).toMap
    factory.actorOf(props(lookup, timeout), "remoteActorResolver" + resolverCount.incrementAndGet())
    lookup
    }

    /**
    * Resolve an ActorPath asynchronously
    *
    * @param lookup Map returned from `StartResolution`
    * @param path One of the ActorPaths in the `lookup` map
    * @return A `Future[ActorRef]` that will be completed when the `ActorPath` is resolved.
    */
    def resolvePathAsync(lookup: LookupMap, path: ActorPath): Future[ActorRef] = lookup(path).future
    }

    /**
    * Using `Identify`/`ActorIdentity`, try to resolve all of the `ActorPath`s in lookup map.
    * As each `ActorPath` is successfully or unsuccessfully resolved its corresponding `Promise`
    * is completed or failed, respectively.
    * Stops self after the last path is resolved.
    *
    * @param lookup Prepopulated map of `ActorPath` to `Promise[ActorRef]`
    * @param timeout After timeout all promises corresponding to unresolved paths will be failed and
    * this actor will stop itself.
    */
    class RemoteActorResolver(lookup: LookupMap, timeout: Duration) extends Actor with ActorLogging {
    val cycleTimeout = 5.seconds
    var elapsed = Duration.Zero

    context setReceiveTimeout cycleTimeout
    sendIdentify()

    def sendIdentify() = lookup foreach { case (path, promise) =>
    if (!promise.isCompleted) context.system.actorSelection(path) ! Identify(path)
    }

    def stopIfFinished(): Unit = if (lookup.values.forall(_.isCompleted)) context stop self

    def exception(message: String) = new ActorResolutionException(message)

    def receive = {
    case ActorIdentity(path: ActorPath, Some(ref)) =>
    lookup(path) trySuccess ref
    stopIfFinished()

    case ActorIdentity(path: ActorPath, None) =>
    lookup(path) tryFailure exception(s"No actor exists at $path")
    stopIfFinished()

    case ReceiveTimeout =>
    elapsed += cycleTimeout
    if (elapsed >= timeout) {
    lookup filterNot(_._2.isCompleted) foreach { case (path, promise) =>
    promise tryFailure exception(s"Timeout waiting for $path to be resolved")
    }
    context stop self
    } else {
    sendIdentify()
    }
    }
    }