Created
December 2, 2022 21:21
-
-
Save soujiro32167/4839f66d5ff27a5b446800fcea71b609 to your computer and use it in GitHub Desktop.
Multi-topic sealed trait kafka producer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import _root_.vulcan.Codec | |
import _root_.vulcan.generic.* | |
import foo.Events.{E1, E2} | |
import fs2.kafka.* | |
import fs2.kafka.vulcan.{AvroSettings, SchemaRegistryClientSettings, avroSerializer} | |
import zio.interop.catz.* | |
import zio.interop.catz.implicits.rts | |
import zio.{Scope, Task, ZIO, ZIOAppDefault} | |
| |
object foo extends ZIOAppDefault { | |
sealed trait Events | |
object Events { | |
case class E1(s: String) extends Events | |
case class E2(i: Int) extends Events | |
| |
implicit val e1Codec: Codec[E1] = Codec.derive | |
implicit val e2Codec: Codec[E2] = Codec.derive | |
val derivedCodec: Codec[Events] = Codec.derive | |
implicit val ecodec: Codec[Events] = Codec.instance( | |
derivedCodec.schema, | |
{ | |
case e: E1 => e1Codec.encode(e) | |
case e: E2 => e2Codec.encode(e) | |
}, | |
(a, s) => derivedCodec.decode(a, s) | |
) | |
} | |
| |
| |
val avroSettings: AvroSettings[Task] = | |
AvroSettings { | |
SchemaRegistryClientSettings[Task]("http://localhost:8081") | |
} | |
| |
def serializer[A: Codec](settings: AvroSettings[Task]): Task[Serializer[Task, A]] = | |
settings.createAvroSerializer(false, None).map { | |
case (serializer, _) => | |
Serializer.instance { (topic, _, a: A) => | |
Codec[A].encode(a) match { | |
case Left(e) => ZIO.fail(e.throwable) | |
case Right(v) => ZIO.attempt(serializer.serialize(topic, v)) | |
} | |
} | |
} | |
| |
val producer: ZIO[Scope, Throwable, KafkaProducer[Task, String, Events]] = | |
serializer[Events](avroSettings).flatMap(s => | |
KafkaProducer.resource( | |
ProducerSettings[Task, String, Events]( | |
keySerializer = avroSerializer[String].using(avroSettings), | |
valueSerializer = s | |
).withBootstrapServers("localhost:9092") | |
).toScopedZIO <* ZIO.log("initializing producer") | |
) | |
| |
| |
override def run = { | |
for { | |
p <- producer | |
_ <- p.produce(ProducerRecords.one(ProducerRecord("e1_topic", "k1", E1("something")))) | |
_ <- p.produce(ProducerRecords.one(ProducerRecord("e2_topic", "k2", E2(1)))) | |
} yield () | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment