Created
May 28, 2019 13:22
-
-
Save notxcain/5bf4d0757b650e0a699c4545114f2f0e to your computer and use it in GitHub Desktop.
Alpakka Kafka Fs2 interop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package aecor.kafkadistributedprocessing | |
import java.time.Duration | |
import java.util | |
import java.util.concurrent.Executors | |
import aecor.util.effect._ | |
import akka.NotUsed | |
import akka.kafka.ConsumerMessage.PartitionOffset | |
import akka.kafka.scaladsl.Consumer | |
import akka.kafka.{ AutoSubscription, ConsumerSettings } | |
import akka.stream.Materializer | |
import akka.stream.scaladsl.{ Keep, Source, Sink => AkkaSink } | |
import cats.effect.concurrent.{ Deferred, Ref } | |
import cats.effect.{ ConcurrentEffect, ContextShift, ExitCase, IO, Timer } | |
import cats.implicits._ | |
import cats.effect.implicits._ | |
import cats.~> | |
import fs2.Stream | |
import fs2.concurrent.Queue | |
import fs2.interop.reactivestreams._ | |
import org.apache.kafka.clients.consumer.{ | |
ConsumerRebalanceListener, | |
ConsumerRecord, | |
Consumer => KafkaConsumer | |
} | |
import org.apache.kafka.common | |
import scala.concurrent.duration._ | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.{ ExecutionContext, Future } | |
object interop { | |
def sourceToStream[F[_], A, Mat](source: Source[A, Mat], materializer: Materializer)( | |
implicit F: ConcurrentEffect[F] | |
): Stream[F, (Mat, Stream[F, A])] = | |
Stream | |
.eval(F.delay { | |
source | |
.toMat(AkkaSink.asPublisher(false))(Keep.both) | |
.run()(materializer) | |
}) | |
.map { | |
case (mat, publisher) => | |
(mat, publisher.toStream[F]()) | |
} | |
final case class TopicPartition[F[_], A](topic: String, partition: Int, messages: Stream[F, A]) | |
final case class CommittableMessage[F[_], K, V](record: ConsumerRecord[K, V], | |
committableOffset: CommittableOffset[F]) | |
final case class CommittableOffset[F[_]](commit: F[Unit], partitionOffset: PartitionOffset) | |
final case class AssignedPartition[F[_]](partition: Int, | |
partitionCount: Int, | |
watchRevocation: F[F[Unit]]) | |
sealed abstract class RebalanceCommand[F[_]] | |
object RebalanceCommand { | |
final case class RevokePartitions[F[_]](partitions: Set[Int], commit: F[Unit]) | |
extends RebalanceCommand[F] | |
final case class AssignPartitions[F[_]](partitions: Set[Int], commit: F[Unit]) | |
extends RebalanceCommand[F] | |
} | |
final class EnqueueingRebalanceListener[F[_]](enqueue: RebalanceCommand[F] => F[Unit])( | |
implicit F: ConcurrentEffect[F] | |
) extends ConsumerRebalanceListener { | |
private def enqueueWithCompletionToken[A](f: F[Unit] => RebalanceCommand[F]): F[Unit] = | |
Deferred[F, Unit].flatMap(completion => enqueue(f(completion.complete(()))) >> completion.get) | |
override def onPartitionsRevoked(partitions: util.Collection[common.TopicPartition]): Unit = | |
F.toIO( | |
enqueueWithCompletionToken( | |
RebalanceCommand.RevokePartitions(partitions.asScala.map(_.partition()).toSet, _) | |
) | |
) | |
.unsafeRunSync() | |
override def onPartitionsAssigned(partitions: util.Collection[common.TopicPartition]): Unit = | |
F.toIO( | |
enqueueWithCompletionToken( | |
RebalanceCommand.AssignPartitions(partitions.asScala.map(_.partition()).toSet, _) | |
) | |
) | |
.unsafeRunSync() | |
} | |
object EnqueueingRebalanceListener { | |
final class UsePartiallyApplied[F[_]] { | |
def use[A]( | |
subscribe: ConsumerRebalanceListener => F[A] | |
)(implicit F: ConcurrentEffect[F]): F[(A, Stream[F, RebalanceCommand[F]])] = | |
for { | |
queue <- Queue.unbounded[F, RebalanceCommand[F]] | |
listener = new EnqueueingRebalanceListener[F](queue.enqueue1) | |
a <- subscribe(listener) | |
} yield (a, queue.dequeue) | |
} | |
def apply[F[_]]: UsePartiallyApplied[F] = new UsePartiallyApplied[F] | |
} | |
def assignPartitions[F[_], K, V](consumerSettings: ConsumerSettings[K, V], topic: String)( | |
implicit F: ConcurrentEffect[F], | |
timer: Timer[F], | |
contextShift: ContextShift[F] | |
): Stream[F, AssignedPartition[F]] = | |
Stream | |
.bracket(F.delay { | |
val consumer = consumerSettings.createKafkaConsumer() | |
val executor = Executors.newSingleThreadExecutor() | |
new ((KafkaConsumer[K, V] => ?) ~> F) { | |
override def apply[A](f: KafkaConsumer[K, V] => A): F[A] = | |
contextShift.evalOn(ExecutionContext.fromExecutor(executor)) { | |
F.async[A] { cb => | |
executor.execute(new Runnable { | |
override def run(): Unit = | |
cb { | |
try Right(f(consumer)) | |
catch { | |
case e: Throwable => Left(e) | |
} | |
} | |
}) | |
} | |
} | |
} | |
})(_(_.close())) | |
.flatMap { accessConsumer => | |
val fetchPartitionCount = accessConsumer(_.partitionsFor(topic).size()) | |
def subscribe(listener: ConsumerRebalanceListener) = | |
accessConsumer(_.subscribe(List(topic).asJava, listener)) | |
val poll = accessConsumer(_.poll(Duration.ofMillis(50))).void | |
Stream | |
.eval( | |
EnqueueingRebalanceListener[F] | |
.use( | |
listener => | |
subscribe(listener) >> | |
fetchPartitionCount | |
) | |
) | |
.flatMap { | |
case (pc, partitions) => | |
partitions | |
.concurrently(Stream.repeatEval(poll >> timer.sleep(500.millis))) | |
.evalScan((List.empty[AssignedPartition[F]], Map.empty[Int, F[Unit] => F[Unit]])) { | |
case ((_, revocationCallbacks), command) => | |
command match { | |
case RebalanceCommand.RevokePartitions(partitions, commit) => | |
partitions.toList | |
.traverse_ { partition => | |
revocationCallbacks.get(partition).traverse { callback => | |
Deferred[F, Unit].flatMap { revocationCompletion => | |
callback(revocationCompletion.complete(())) >> revocationCompletion.get | |
} | |
} | |
} >> commit.as((List.empty, revocationCallbacks -- partitions)) | |
case RebalanceCommand.AssignPartitions(partitions, commit) => | |
partitions.toList | |
.traverse { p => | |
Deferred[F, F[Unit]].flatMap { x => | |
Ref[F].of(false).map { cancelled => | |
( | |
AssignedPartition(p, pc, x.get.guaranteeCase { | |
case ExitCase.Canceled => cancelled.set(true) | |
case _ => ().pure[F] | |
}), | |
(complete: F[Unit]) => | |
cancelled.get.ifM(complete, x.complete(complete)) | |
) | |
} | |
} | |
} | |
.flatMap { list => | |
val assignedPartitions = list.map(_._1) | |
val updatedRevocationCallbacks = revocationCallbacks ++ list.map( | |
x => (x._1.partition, x._2) | |
) | |
commit.as((assignedPartitions, updatedRevocationCallbacks)) | |
} | |
} | |
} | |
} | |
.flatMap { | |
case (assignedPartitions, _) => | |
Stream.emits(assignedPartitions) | |
} | |
} | |
def committablePartitionedStream[F[_], K, V]( | |
consumerSettings: ConsumerSettings[K, V], | |
subscription: AutoSubscription, | |
materializer: Materializer | |
)(implicit F: ConcurrentEffect[F]): Stream[F, TopicPartition[F, CommittableMessage[F, K, V]]] = | |
Consumer | |
.committablePartitionedSource(consumerSettings, subscription) | |
.toStream[F](materializer) | |
.flatMap { | |
case (control, stream) => | |
val shutdown = | |
F.liftIO( | |
IO.fromFuture( | |
IO(control.drainAndShutdown(Future.successful(()))(materializer.executionContext)) | |
) | |
) | |
.void | |
stream.onFinalize(shutdown).map { | |
case (tp, source) => | |
val messages = source.toStream[F](materializer).map { cm => | |
CommittableMessage( | |
cm.record, | |
CommittableOffset( | |
F.fromFuture(cm.committableOffset.commitScaladsl()).void, | |
cm.committableOffset.partitionOffset | |
) | |
) | |
} | |
TopicPartition(tp.topic(), tp.partition(), messages) | |
} | |
} | |
def plainPartitionedStream[F[_], K, V]( | |
consumerSettings: ConsumerSettings[K, V], | |
subscription: AutoSubscription, | |
materializer: Materializer | |
)(implicit F: ConcurrentEffect[F]): Stream[F, TopicPartition[F, ConsumerRecord[K, V]]] = | |
Consumer | |
.plainPartitionedSource(consumerSettings, subscription) | |
.toStream[F](materializer) | |
.flatMap { | |
case (control, stream) => | |
val shutdown = | |
F.liftIO( | |
IO.fromFuture( | |
IO(control.drainAndShutdown(Future.successful(()))(materializer.executionContext)) | |
) | |
) | |
.void | |
stream.onFinalize(shutdown).map { | |
case (tp, source) => | |
val messages = source.toStream[F](materializer) | |
TopicPartition(tp.topic(), tp.partition(), messages) | |
} | |
} | |
implicit final class ConsumerSettingsOps[K, V](val self: ConsumerSettings[K, V]) extends AnyVal { | |
def committablePartitionedStream[F[_]: ConcurrentEffect]( | |
subscription: AutoSubscription, | |
materializer: Materializer | |
): Stream[F, TopicPartition[F, CommittableMessage[F, K, V]]] = | |
interop.committablePartitionedStream(self, subscription, materializer) | |
def plainPartitionedStream[F[_]: ConcurrentEffect]( | |
subscription: AutoSubscription, | |
materializer: Materializer | |
): Stream[F, TopicPartition[F, ConsumerRecord[K, V]]] = | |
interop.plainPartitionedStream(self, subscription, materializer) | |
} | |
implicit final class SourceToStreamOps[A, Mat](val source: Source[A, Mat]) extends AnyVal { | |
def toStream[F[_]]( | |
materializer: Materializer | |
)(implicit F: ConcurrentEffect[F]): Stream[F, (Mat, Stream[F, A])] = | |
sourceToStream(source, materializer) | |
} | |
implicit final class SourceWithNotUsedMatToStreamOps[A](val source: Source[A, NotUsed]) | |
extends AnyVal { | |
def toStream[F[_]](materializer: Materializer)(implicit F: ConcurrentEffect[F]): Stream[F, A] = | |
sourceToStream(source, materializer).flatMap(_._2) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment