Last active
April 25, 2017 14:14
-
-
Save atamborrino/57f9095cb30d70e90d8507602a14b6bd to your computer and use it in GitHub Desktop.
Parallel SCAN per node on a Redis Cluster as an Akka Stream Source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.stream.scaladsl.Source | |
import redis.clients.jedis.{Jedis, JedisCluster, ScanParams} | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.Future | |
class RedisStream(maxNodeParallelism: Int) | |
(implicit jedisCluster: JedisCluster, | |
blockingEC: RedisBlockingEC) { | |
implicit val ec = blockingEC.value | |
def scan(scanParams: ScanParams): Source[String, NotUsed] = { | |
lazy val futMasterNodes = Future(jedisCluster.getClusterNodes.asScala.toMap) | |
.flatMap { nodes => | |
Future.traverse(nodes) { case (_, node) => | |
Future { | |
ScalaCloseable.withResource(node.getResource()) { conn => | |
val info = conn.info("replication") | |
if (info.contains("role:master")) Some(node) | |
else None | |
} | |
} | |
} | |
} | |
.map(_.flatten) | |
Source.lazily(() => Source.fromFuture(futMasterNodes)) | |
.mapConcat(_.toList) | |
.flatMapMerge(maxNodeParallelism, { node => | |
Source.unfoldAsync[Long, List[String]](0) { cursor => | |
if (cursor == -1) Future.successful(None) | |
else { | |
Future { | |
ScalaCloseable.withResource(node.getResource()) { conn => | |
conn.scan(cursor.toString, scanParams) | |
} | |
} | |
.map { scanResult => | |
val keys = scanResult.getResult.asScala.toList | |
val scanResultCursor = scanResult.getStringCursor.toLong | |
val nextCursor = if (scanResultCursor == 0) -1 else scanResultCursor | |
Some(nextCursor -> keys) | |
} | |
} | |
} | |
}) | |
.mapConcat(_.toList) | |
.mapMaterializedValue(_ => NotUsed) | |
} | |
} | |
object ScalaCloseable { | |
def withResource[A >: Null <: AutoCloseable, B](a: => A)(f: A => B) = { | |
var aOrNull: A = null | |
try { | |
aOrNull = a | |
f(aOrNull) | |
} finally { | |
if (aOrNull != null) aOrNull.close() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Améliorer la gestion de la connection: