Created
January 30, 2017 09:24
-
-
Save sosuren/5556ab763689056d7f464856fe2beac7 to your computer and use it in GitHub Desktop.
Akka Streaming Sample to find distribution of Patients' Health based on Age and Gender
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.stage.{InHandler, GraphStageLogic, GraphStage} | |
import sample.stream.AkkaStreamingSample.Gender.Gender | |
import scala.util.Random | |
object AkkaStreamingSample { | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
// implicit actor system | |
implicit val system = ActorSystem("Sample") | |
// implicit actor materializer | |
implicit val materializer = ActorMaterializer() | |
def main(args: Array[String]): Unit = { | |
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => | |
import Helper._ | |
import GraphDSL.Implicits._ | |
// source to create Patients' Record | |
val outletPatientRecord: Outlet[PatientRecord] = builder.add(Source(1 to 200) map (_ => PatientRecord(randomAge, randomGender, HealthIndex(randomIndex)))).out | |
// filters invalid record | |
val filter = Flow[PatientRecord].filter(p => isIndexInRange(p.index.value)) | |
// broadcasts stream to calculate distribution by age and gender | |
val broadcaster = builder.add(Broadcast[PatientRecord](2)) | |
// sink to calculate age distribution | |
val inletByAge: Inlet[PatientRecord] = builder.add(Sink.fromGraph(new DistributionByAge)).in | |
// sink to calculate gender distribution | |
val inletByGender: Inlet[PatientRecord] = builder.add(Sink.fromGraph(new DistributionByGender)).in | |
outletPatientRecord ~> filter ~> broadcaster | |
inletByAge <~ broadcaster | |
inletByGender <~ broadcaster | |
ClosedShape | |
}) | |
g.run() | |
} | |
object Gender extends Enumeration { | |
type Gender = Value | |
val MALE, FEMALE = Value | |
} | |
object LifeStage extends Enumeration { | |
type LifeStage = Value | |
val CHILD, ADULT, OLD = Value | |
} | |
case class HealthIndex(value: Int) | |
case class PatientRecord(age: Int, gender: Gender, index: HealthIndex) | |
object Helper { | |
val marginIndex = 3 | |
val random = new Random() | |
val maxAge = 88 | |
val minIndex = 0 | |
val maxIndex = 5 | |
def lifeStage(age: Int) = age match { | |
case x if x <= 20 => LifeStage.CHILD | |
case x if x >= 21 && x <= 45 => LifeStage.ADULT | |
case x if x >= 46 => LifeStage.OLD | |
} | |
def isChild(age: Int):Boolean = age <= 20 | |
def isAdult(age: Int):Boolean = age >= 21 && age <= 45 | |
def isOldAge(age: Int): Boolean = age >= 46 | |
def isIndexFailed(x: Int) = x > marginIndex | |
def isIndexInRange(idx: Int) = idx >= minIndex && idx <= maxIndex | |
def randomAge = random.nextInt(maxAge) | |
def randomGender:Gender = Gender(random.nextInt(Gender.maxId)) | |
def randomIndex = random.nextInt(maxIndex * 2) // should be filter | |
} | |
class DistributionByAge extends GraphStage[SinkShape[PatientRecord]] { | |
val in: Inlet[PatientRecord] = Inlet("DistributionByAge") | |
override val shape: SinkShape[PatientRecord] = SinkShape.of(in) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
private var numChildFailed = 0 | |
private var numChildTotal = 0 | |
private var numAdultFailed = 0 | |
private var numAdultTotal = 0 | |
private var numOldAgeFailed = 0 | |
private var numOldAgeTotal = 0 | |
private var totalCount = 0 | |
override def preStart(): Unit = pull(in) | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
import Helper._ | |
val elem = grab(in) | |
lifeStage(elem.age) match { | |
case LifeStage.CHILD => | |
numChildTotal += 1 | |
if (isIndexFailed(elem.index.value)) numChildFailed += 1 | |
case LifeStage.ADULT => | |
numAdultTotal += 1 | |
if (isIndexFailed(elem.index.value)) numAdultFailed += 1 | |
case LifeStage.OLD => | |
numOldAgeTotal += 1 | |
if (isIndexFailed(elem.index.value)) numOldAgeFailed += 1 | |
} | |
totalCount += 1 | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
println(s"Distribution by Age [Total: $totalCount]:") | |
println(s"\tChild: $numChildFailed/$numChildTotal") | |
println(s"\tAdult: $numAdultFailed/$numAdultTotal") | |
println(s"\tOld: $numOldAgeFailed/$numOldAgeTotal") | |
completeStage() | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = { | |
println(s"Upstream Failed: ${ex.getMessage}") | |
failStage(ex) | |
} | |
}) | |
} | |
} | |
class DistributionByGender extends GraphStage[SinkShape[PatientRecord]] { | |
val in: Inlet[PatientRecord] = Inlet("DistributionByAge") | |
override val shape: SinkShape[PatientRecord] = SinkShape.of(in) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
private var numFemaleFailed = 0 | |
private var numFemaleTotal = 0 | |
private var numMaleFailed = 0 | |
private var numMaleTotal = 0 | |
private var totalCount = 0 | |
override def preStart(): Unit = pull(in) | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
import Helper._ | |
grab(in) match { | |
case PatientRecord(_, Gender.FEMALE, idx) => | |
numFemaleTotal += 1 | |
if (isIndexFailed(idx.value)) numFemaleFailed += 1 | |
case PatientRecord(_, Gender.MALE, idx) => | |
numMaleTotal += 1 | |
if (isIndexFailed(idx.value)) numMaleFailed += 1 | |
case _ => | |
} | |
totalCount += 1 | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
println(s"Distribution by Gender [Total: $totalCount]:") | |
println(s"\tFemale: $numFemaleFailed/$numFemaleTotal") | |
println(s"\tMale: $numMaleFailed/$numMaleTotal") | |
completeStage() | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = { | |
println(s"Upstream Failed: ${ex.getMessage}") | |
failStage(ex) | |
} | |
}) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment