Last active
January 17, 2024 00:58
-
-
Save andyczerwonka/23da899e21d3f69618360024038e4be4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.fury.Fury | |
import io.fury.config.Language | |
import org.junit.Test | |
import org.scalatest.Assertions._ | |
import java.io.ByteArrayInputStream | |
import java.io.ByteArrayOutputStream | |
import java.util.Base64 | |
import java.util.zip.GZIPInputStream | |
import java.util.zip.GZIPOutputStream | |
import scala.util.Try | |
import scala.util.Success | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext | |
import java.util.concurrent.ThreadLocalRandom | |
import com.typesafe.scalalogging.StrictLogging | |
import scala.concurrent.Await | |
import io.fury.ThreadLocalFury | |
import scala.annotation.nowarn | |
// What makes this fail is the nested collection in the case class. If you change it to | |
// as 1-dimensional collection, we no longer see the exception | |
case class SampleData(label: String, data: Seq[Seq[Int]]) | |
@nowarn | |
class SerdeThreadingTest extends StrictLogging { | |
def threadLocalFury = | |
new ThreadLocalFury(classLoader => { | |
Fury | |
.builder() | |
.withLanguage(Language.JAVA) | |
.requireClassRegistration(false) | |
.withScalaOptimizationEnabled(true) | |
.withRefTracking(true) | |
.withStringCompressed(true) | |
.withLongCompressed(true) | |
.withIntCompressed(true) | |
.withAsyncCompilation(true) | |
.withClassLoader(classLoader) | |
.build() | |
}) | |
private val fury = Fury | |
.builder() | |
.withLanguage(Language.JAVA) | |
.requireClassRegistration(false) | |
.withScalaOptimizationEnabled(true) | |
.withRefTracking(true) | |
.withStringCompressed(true) | |
.withLongCompressed(true) | |
.withIntCompressed(true) | |
.withAsyncCompilation(true) | |
.buildThreadSafeFury() | |
def encode(sampleData: SampleData) = { | |
val raw = fury.serialize(sampleData) | |
val bos = new ByteArrayOutputStream(raw.length) | |
val zos = new GZIPOutputStream(bos) | |
zos.write(raw) | |
zos.flush() | |
zos.close() | |
bos.close() | |
sleepBetween(500, 1000) | |
Base64.getEncoder.encodeToString(bos.toByteArray) | |
} | |
def decode(encoded: String) = | |
Try { | |
val bis = new ByteArrayInputStream(Base64.getDecoder.decode(encoded)) | |
val zis = new GZIPInputStream(bis) | |
val uncompressed = zis.readAllBytes() | |
val result = fury.deserialize(uncompressed).asInstanceOf[SampleData] | |
zis.close() | |
bis.close() | |
sleepBetween(500, 1000) | |
result | |
} | |
// The threading is a red herring as this simple test fails as well | |
@Test | |
def testNonThreadedSerde(): Unit = { | |
val data = SampleData("single sample", Seq.empty) | |
val encoded = encode(data) | |
val decoded = decode(encoded) | |
assert(decoded == Success(data)) | |
} | |
@Test | |
def testNestedCollectionThreadedSerde(): Unit = { | |
import scala.concurrent.duration._ | |
implicit val ec = ExecutionContext.global | |
val tasks = for (i <- 1 to 1) yield Future { | |
val data = SampleData(i.toString, Seq.empty) | |
logger.info(s"Start encoding ${data.label}") | |
val encoded = encode(data) | |
logger.info(s"End encoding ${data.label}") | |
encoded | |
} | |
val decodedFuture = for { | |
f <- Future.sequence(tasks) | |
} yield for { | |
encoded <- f | |
} yield { | |
logger.info(s"Start decoding...") | |
val Success(decoded) = decode(encoded) | |
logger.info(s"End decoding ${decoded.label}") | |
decoded | |
} | |
val result = Await.result(decodedFuture, 20.seconds) | |
assert(result.size == 1) | |
} | |
def sleepBetween(min: Int, max: Int) = { | |
val sleepTime = ThreadLocalRandom.current().nextInt(min, max) | |
Thread.sleep(sleepTime.toLong) | |
} | |
} |
Also note, the threading is a red herring. It fails the same way in a simple, single-threaded tests. https://gist.github.com/andyczerwonka/23da899e21d3f69618360024038e4be4#file-serdethreadingtest-scala-L81-L88
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Could you use ThreafSafeFury instead, seems that you are using same Fury for multiple threads