Last active
August 29, 2015 13:56
-
-
Save ericacm/9338232 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger | |
import concurrent.{Future, Promise} | |
import akka.actor._ | |
import concurrent.duration._ | |
import util.control.NoStackTrace | |
/** | |
* NOTE: `RemoteActorResolver` has been superseded by `actorSelection.resolveOne` | |
*/ | |
object RemoteActorResolver { | |
val resolverCount = new AtomicInteger(0) | |
type LookupMap = Map[ActorPath, Promise[ActorRef]] | |
class ActorResolutionException(message: String) extends Exception(message) with NoStackTrace | |
def props(lookup: LookupMap, timeout: Duration): Props = Props(classOf[RemoteActorResolver], lookup, timeout) | |
/** | |
* Start resolving ActorPaths to ActorRefs | |
* | |
* @param factory `ActorContext` or `ActorSystem` | |
* @param paths Collection of `ActorPaths` to resolve | |
* @param timeout Timeout after which promises will be failed | |
* @return Map of `ActorPath` to `Promise[ActorRef]` where the `ActorPaths` are the `paths` | |
* and the promises will be completed when the paths are resolved (or | |
* failed if the timeout is reached). | |
*/ | |
def startResolution(factory: ActorRefFactory, paths: Seq[ActorPath], timeout: Duration = 5.minutes): LookupMap = { | |
val lookup = paths.map(_ -> Promise[ActorRef]()).toMap | |
factory.actorOf(props(lookup, timeout), "remoteActorResolver" + resolverCount.incrementAndGet()) | |
lookup | |
} | |
/** | |
* Resolve an ActorPath asynchronously | |
* | |
* @param lookup Map returned from `StartResolution` | |
* @param path One of the ActorPaths in the `lookup` map | |
* @return A `Future[ActorRef]` that will be completed when the `ActorPath` is resolved. | |
*/ | |
def resolvePathAsync(lookup: LookupMap, path: ActorPath): Future[ActorRef] = lookup(path).future | |
} | |
/** | |
* Using `Identify`/`ActorIdentity`, try to resolve all of the `ActorPath`s in lookup map. | |
* As each `ActorPath` is successfully or unsuccessfully resolved its corresponding `Promise` | |
* is completed or failed, respectively. | |
* Stops self after the last path is resolved. | |
* | |
* @param lookup Prepopulated map of `ActorPath` to `Promise[ActorRef]` | |
* @param timeout After timeout all promises corresponding to unresolved paths will be failed and | |
* this actor will stop itself. | |
*/ | |
class RemoteActorResolver(lookup: LookupMap, timeout: Duration) extends Actor with ActorLogging { | |
val cycleTimeout = 5.seconds | |
var elapsed = Duration.Zero | |
context setReceiveTimeout cycleTimeout | |
sendIdentify() | |
def sendIdentify() = lookup foreach { case (path, promise) => | |
if (!promise.isCompleted) context.system.actorSelection(path) ! Identify(path) | |
} | |
def stopIfFinished(): Unit = if (lookup.values.forall(_.isCompleted)) context stop self | |
def exception(message: String) = new ActorResolutionException(message) | |
def receive = { | |
case ActorIdentity(path: ActorPath, Some(ref)) => | |
lookup(path) trySuccess ref | |
stopIfFinished() | |
case ActorIdentity(path: ActorPath, None) => | |
lookup(path) tryFailure exception(s"No actor exists at $path") | |
stopIfFinished() | |
case ReceiveTimeout => | |
elapsed += cycleTimeout | |
if (elapsed >= timeout) { | |
lookup filterNot(_._2.isCompleted) foreach { case (path, promise) => | |
promise tryFailure exception(s"Timeout waiting for $path to be resolved") | |
} | |
context stop self | |
} else { | |
sendIdentify() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment