-
-
Save TanUkkii007/f3b5784df0f8f7c3b144c38279455a09 to your computer and use it in GitHub Desktop.
| package streams | |
| import akka.actor.ActorSystem | |
| import akka.stream._ | |
| import akka.stream.scaladsl._ | |
| import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
| import com.typesafe.config.ConfigFactory | |
| object StreamDispatchers extends App { | |
| implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
| """ | |
| another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| yet-another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| mine { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| """)) | |
| implicit val materializer = ActorMaterializer() | |
| val source = Source.single(1).flatMapConcat { _ => | |
| Source.fromGraph(new TestStage(20)).withAttributes(ActorAttributes.dispatcher("mine")) | |
| } | |
| .map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
| .map { n => println("expected another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("another")) | |
| .map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("yet-another")) | |
| .runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
| } | |
| class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
| val out: Outlet[Int] = Outlet("TestStage") | |
| override def shape: SourceShape[Int] = SourceShape(out) | |
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
| var i = 0 | |
| setHandler(out, new OutHandler { | |
| override def onPull(): Unit = { | |
| i += 1 | |
| println("expected mine: " + Thread.currentThread().getName) | |
| if (i == n) | |
| complete(out) | |
| else | |
| push(out, i) | |
| } | |
| }) | |
| } | |
| } |
| package streams | |
| import akka.actor.ActorSystem | |
| import akka.stream._ | |
| import akka.stream.scaladsl._ | |
| import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
| import com.typesafe.config.ConfigFactory | |
| object StreamDispatchers extends App { | |
| implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
| """ | |
| another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| yet-another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| mine { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| """)) | |
| implicit val materializer = ActorMaterializer() | |
| val source = Source.single(1).flatMapConcat { _ => | |
| Source.fromGraph(new TestStage(20)).withAttributes(ActorAttributes.dispatcher("mine")).async //Just added async to above example | |
| } | |
| .map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
| .map { n => println("expected another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("another")) | |
| .map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("yet-another")) | |
| .runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
| } | |
| class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
| val out: Outlet[Int] = Outlet("TestStage") | |
| override def shape: SourceShape[Int] = SourceShape(out) | |
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
| var i = 0 | |
| setHandler(out, new OutHandler { | |
| override def onPull(): Unit = { | |
| i += 1 | |
| println("expected mine: " + Thread.currentThread().getName) | |
| if (i == n) | |
| complete(out) | |
| else | |
| push(out, i) | |
| } | |
| }) | |
| } | |
| } |
| package streams | |
| import akka.actor.ActorSystem | |
| import akka.stream._ | |
| import akka.stream.scaladsl._ | |
| import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } | |
| import com.typesafe.config.ConfigFactory | |
| object StreamDispatchers extends App { | |
| implicit val system = ActorSystem("dispatchers", ConfigFactory.parseString( | |
| """ | |
| another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| yet-another { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| mine { | |
| type = Dispatcher | |
| executor = "thread-pool-executor" | |
| thread-pool-executor { | |
| fixed-pool-size = 1 | |
| } | |
| } | |
| """)) | |
| implicit val materializer = ActorMaterializer() | |
| val source = Source.single(1).flatMapConcat { _ => | |
| Source.fromGraph(new TestStage(20)) | |
| }.withAttributes(ActorAttributes.dispatcher("mine")) // dispatcher attribute attatched to flatMapConcat stage | |
| .map { n => println("expected default (map): " + Thread.currentThread().getName); n } | |
| .map { n => println("expected another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("another")) | |
| .map { n => println("expected yet-another: " + Thread.currentThread().getName); n } | |
| .addAttributes(ActorAttributes.dispatcher("yet-another")) | |
| .runForeach(n => println("expected default (runForeach): " + Thread.currentThread().getName)) | |
| } | |
| class TestStage(n: Int) extends GraphStage[SourceShape[Int]] { | |
| val out: Outlet[Int] = Outlet("TestStage") | |
| override def shape: SourceShape[Int] = SourceShape(out) | |
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
| var i = 0 | |
| setHandler(out, new OutHandler { | |
| override def onPull(): Unit = { | |
| i += 1 | |
| println("expected mine: " + Thread.currentThread().getName) | |
| if (i == n) | |
| complete(out) | |
| else | |
| push(out, i) | |
| } | |
| }) | |
| } | |
| } |
//example output from StreamDispatchersWithFlatMapConcatAndAsyncBoundary.scala
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected mine: dispatchers-mine-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
// output fom StreamDispatchersWithFlatMapConcatWithDispathcer.scala
expected mine: dispatchers-mine-5
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected mine: dispatchers-mine-5
expected another: dispatchers-another-6
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected mine: dispatchers-mine-5
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-7
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
//example output from StreamDispatchersWithFlatMapConcat.scala
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected mine: dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected default (map): dispatchers-another-6
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (map): dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (map): dispatchers-another-6
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected another: dispatchers-another-6
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected another: dispatchers-another-6
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected yet-another: dispatchers-yet-another-5
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4
expected default (runForeach): dispatchers-akka.actor.default-dispatcher-4