Created
July 11, 2025 18:33
-
-
Save calvinlfer/830afd0019a0cf7babc09d6b893ce896 to your computer and use it in GitHub Desktop.
Circe JsonSchema + Confluent Schema Registry based Serdes for Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.Resource | |
import com.fasterxml.jackson.databind.JsonNode | |
import fs2.kafka.* | |
import io.circe.Decoder | |
import io.circe.jackson.jacksonToCirce | |
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient} | |
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer | |
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerConfig.* | |
import zio.* | |
import zio.interop.catz.* | |
import scala.jdk.CollectionConverters.* | |
object JsonSchemaDeserializer: | |
def forKey[A: Decoder]( | |
config: SchemaRegistryConfig | |
): Resource[Task, KeyDeserializer[Task, A]] = create[A](config, true) | |
def forValue[A: Decoder]( | |
config: SchemaRegistryConfig | |
): Resource[Task, ValueDeserializer[Task, A]] = create[A](config, false) | |
private def create[A: Decoder]( | |
config: SchemaRegistryConfig, | |
isKey: Boolean | |
): Resource[Task, Deserializer[Task, A]] = | |
val jsonTypeConfig = | |
if isKey then (JSON_KEY_TYPE -> classOf[JsonNode].getName()) | |
else (JSON_VALUE_TYPE -> classOf[JsonNode].getName()) | |
val underlyingConfig = Map( | |
"schema.registry.url" -> config.url, // required prop for the deserializer | |
FAIL_UNKNOWN_PROPERTIES -> false, | |
FAIL_INVALID_SCHEMA -> false | |
) | |
+ jsonTypeConfig | |
val clientResource: Resource[Task, SchemaRegistryClient] = | |
val acquire = ZIO.attempt(new CachedSchemaRegistryClient(config.url, config.cacheCapacity, config.providers.asJava, Map.empty.asJava)) | |
val release = (client: SchemaRegistryClient) => ZIO.attempt(client.close()).ignoreLogged | |
Resource.make(acquire)(release) | |
clientResource | |
.evalMap: client => | |
for | |
deserializer <- ZIO.attempt(new KafkaJsonSchemaDeserializer[JsonNode](client)) | |
_ <- ZIO.attempt(deserializer.configure(underlyingConfig.asJava, isKey)) | |
yield deserializer | |
.map: deserializer => | |
Deserializer.instance: (topic, headers, bytes) => | |
ZIO | |
.attempt(deserializer.deserialize(topic, headers.asJava, bytes)) | |
.map(jacksonToCirce(_).as[A]) | |
.absolve |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.* | |
import com.fasterxml.jackson.databind.JsonNode | |
import fs2.kafka.* | |
import io.circe.Encoder | |
import io.circe.jackson.circeToJackson | |
import io.circe.syntax.* | |
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient} | |
import io.confluent.kafka.schemaregistry.json.{JsonSchema, JsonSchemaUtils} | |
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer | |
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig.* | |
import zio.* | |
import zio.interop.catz.* | |
import scala.jdk.CollectionConverters.* | |
object JsonSchemaSerializer: | |
def forKey[A: Encoder]( | |
schema: JsonSchema, | |
config: SchemaRegistryConfig | |
): Resource[Task, KeySerializer[Task, A]] = | |
create[A](schema, config, isKey = true) | |
def forValue[A: Encoder]( | |
schema: JsonSchema, | |
config: SchemaRegistryConfig | |
): Resource[Task, ValueSerializer[Task, A]] = | |
create[A](schema, config, isKey = false) | |
def create[A: Encoder]( | |
schema: JsonSchema, | |
config: SchemaRegistryConfig, | |
isKey: Boolean | |
): Resource[Task, Serializer[Task, A]] = | |
val clientResource: Resource[Task, SchemaRegistryClient] = | |
val acquire = ZIO.attempt(CachedSchemaRegistryClient(config.url, config.cacheCapacity)) | |
val release = (client: SchemaRegistryClient) => ZIO.attempt(client.close()).ignoreLogged | |
Resource.make(acquire)(release) | |
val underlyingProperties = Map( | |
"schema.registry.url" -> config.url, | |
FAIL_UNKNOWN_PROPERTIES -> false, | |
FAIL_INVALID_SCHEMA -> false // huheuhue | |
).asJava | |
clientResource | |
.evalMap: client => | |
ZIO.attempt: | |
val underlying = KafkaJsonSchemaSerializer[JsonNode](client, underlyingProperties) | |
underlying.configure(underlyingProperties, isKey) | |
underlying | |
.map: underlying => | |
Serializer | |
.delegate[Task, JsonNode](underlying) | |
.contramap[A]: a => | |
val payload = circeToJackson(a.asJson) | |
JsonSchemaUtils.envelope(schema, payload) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.confluent.kafka.schemaregistry.SchemaProvider | |
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider | |
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider | |
import zio.Config | |
final case class SchemaRegistryConfig(url: SchemaRegistryUrl, cacheCapacity: CacheCapacity): | |
val providers: List[SchemaProvider] = List(new JsonSchemaProvider(), new AvroSchemaProvider()) | |
opaque type SchemaRegistryUrl <: String = String | |
object SchemaRegistryUrl: | |
val config: Config[SchemaRegistryUrl] = | |
Config | |
.string("KAFKA_SCHEMA_REGISTRY_URL") | |
.mapOrFail(raw => make(raw).left.map(error => Config.Error.InvalidData(message = error))) | |
.withDefault("http://schema-registry.be-central.aws-us-east-1.gpt.czrs.io") | |
def make(in: String): Either[String, SchemaRegistryUrl] = | |
Either.cond(in.nonEmpty, right = in, left = "Schema Registry URL must be specified") | |
opaque type CacheCapacity <: Int = Int | |
object CacheCapacity: | |
val config: Config[CacheCapacity] = | |
Config | |
.int("KAFKA_SCHEMA_REGISTRY_CACHE_CAPACITY") | |
.mapOrFail(raw => make(raw).left.map(error => Config.Error.InvalidData(message = error))) | |
.withDefault(1024) | |
def make(in: Int): Either[String, CacheCapacity] = | |
Either.cond(in > 0, right = in, left = "Schema Registry Cache Capacity must be specified and greater than 0") | |
object SchemaRegistryConfig: | |
val config: Config[SchemaRegistryConfig] = | |
( | |
SchemaRegistryUrl.config ++ | |
CacheCapacity.config | |
).map: | |
SchemaRegistryConfig.apply | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Alternative to skip dealing with the schema registry:
The first 5 bytes contain the magic byte + schema id info
