Created
March 28, 2013 02:54
-
-
Save danielhopkins/5260157 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class EchoActor extends Actor with ActorLogging { | |
val cluster = Cluster(context.system) | |
override def preStart() { cluster.subscribe(self, classOf[LeaderChanged]) } | |
override def postStop() { cluster.unsubscribe(self) } | |
def receive = { | |
case state: CurrentClusterState => log.info(s"State is $state") | |
case LeaderChanged(leader) => log.info(s"Leader is $leader") | |
case msg => { | |
log.info(s"Got msg $msg from $sender") | |
sender ! msg | |
} | |
} | |
} | |
class BroadcastMain extends Bootable { | |
val config = ConfigFactory.load | |
val system = ActorSystem("Broadcast", config.getConfig("cluster")) | |
def startup() { | |
system.actorOf(Props[EchoActor], name = "echo") | |
} | |
def shutdown() { system.shutdown() } | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TestSender(val bridge: ActorRef) extends Actor with ActorLogging { | |
import context.dispatcher | |
override def preStart() { | |
context.system.scheduler.scheduleOnce(5 seconds) { | |
bridge ! "Hi Echo" | |
} | |
} | |
def receive = { | |
case s => {log.info("Got an echo: " + s)} | |
} | |
} | |
class LeaderSender extends Actor with ActorLogging { | |
val cluster = Cluster(context.system) | |
var currentLeader: Option[Address] = None | |
override def preStart() { cluster.subscribe(self, classOf[LeaderChanged]) } | |
override def postStop() { cluster.unsubscribe(self) } | |
def receive = { | |
case state: CurrentClusterState => currentLeader = state.leader | |
case LeaderChanged(leader) => currentLeader = leader | |
case msg => { | |
implicit val timeout = Timeout(1 second) | |
log.info(s"Sending msg from $sender") | |
currentLeader.foreach { l => | |
/* | |
Here are two different ways we can send the message | |
If I pass in a regular actorRef ("normalActorRef") | |
only the ask & pipeTo returns the message correctly. | |
If I use the externalActorRef, both of them work. | |
The second one is what I'm trying for, as it seems to be | |
more idiomatic | |
*/ | |
// leaderBroadcast(l).ask(msg).pipeTo(sender) | |
leaderBroadcast(l).forward(msg) | |
} | |
} | |
} | |
def leaderBroadcast(l: Address): ActorRef = context.actorFor(RootActorPath(l) / "user" / "echo") | |
} | |
class TestMain extends Bootable { | |
val configStr = """ | |
akka { | |
loglevel = INFO | |
actor { | |
provider = "akka.remote.RemoteActorRefProvider" | |
} | |
remote { | |
transport = "akka.remote.netty.NettyRemoteTransport" | |
netty { | |
hostname = "127.0.0.1" | |
port = 0 | |
} | |
} | |
} | |
""" | |
val config = ConfigFactory.parseString(configStr) | |
val system = ActorSystem("TestSystem", config) | |
val clusterSystem = ActorSystem("Broadcast", ConfigFactory.load.getConfig("cluster") | |
.withValue("akka.remote.netty.port", ConfigValueFactory.fromAnyRef(0))) | |
def startup() { | |
clusterSystem.actorOf(Props[LeaderSender], name = "bridge") | |
/* By using the clusterAddress to create the actorRef we appear | |
to get differently functioning actorRefs. | |
*/ | |
val clusterAddress = ExternalAddress(clusterSystem).addressForAkka | |
val externalActorRef = system.actorFor(RootActorPath(clusterAddress) / "user" / "bridge") | |
val normalActorRef = clusterSystem.actorFor("/user/bridge") | |
system.actorOf(Props(new TestSender(externalActorRef)), "testSender") | |
} | |
def shutdown() { system.shutdown() } | |
} | |
object ExternalAddress extends ExtensionKey[ExternalAddressExt] | |
class ExternalAddressExt(system: ExtendedActorSystem) extends Extension { | |
def addressForAkka: Address = system.provider.getDefaultAddress | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment