Skip to content

Instantly share code, notes, and snippets.

@bpholt
Last active November 7, 2018 23:10

Revisions

  1. bpholt revised this gist Nov 7, 2018. 1 changed file with 4 additions and 4 deletions.
    8 changes: 4 additions & 4 deletions Sha256Pipe.scala
    Original file line number Diff line number Diff line change
    @@ -7,10 +7,10 @@ import tsec.common._

    object Sha256Pipe {
    def apply[F[_] : Sync](promisedHexString: Promise[F, String]): Pipe[F, Byte, Byte] = {
    def pull(digest: MessageDigest): Stream[F, Chunk[Byte]] => Pull[F, Byte, String] =
    _.pull.uncons1.flatMap {
    def pull(digest: MessageDigest): Stream[F, Byte] => Pull[F, Byte, String] =
    _.pull.unconsChunk.flatMap {
    case None => Pull.eval(Sync[F].delay(digest.digest().toHexString))
    case Some((c: Chunk[Byte], rest: Stream[F, Chunk[Byte]])) =>
    case Some((c: Chunk[Byte], rest: Stream[F, Byte])) =>
    val bytes = c.toBytes
    for {
    _ <- Pull.eval(Sync[F].delay(digest.update(bytes.values, bytes.offset, bytes.length)))
    @@ -22,7 +22,7 @@ object Sha256Pipe {
    def calculateHashOf(input: Stream[F, Byte]): Pull[F, Byte, Unit] =
    for {
    initialDigest <- Pull.eval(Sync[F].delay(MessageDigest.getInstance("SHA-256")))
    hexString <- pull(initialDigest)(input.chunks)
    hexString <- pull(initialDigest)(input)
    _ <- Pull.eval(promisedHexString.complete(hexString))
    } yield ()

  2. bpholt created this gist Nov 7, 2018.
    52 changes: 52 additions & 0 deletions Sha256Pipe.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,52 @@
    import java.security.MessageDigest

    import cats.effect._
    import fs2._
    import fs2.async.Promise
    import tsec.common._

    object Sha256Pipe {
    def apply[F[_] : Sync](promisedHexString: Promise[F, String]): Pipe[F, Byte, Byte] = {
    def pull(digest: MessageDigest): Stream[F, Chunk[Byte]] => Pull[F, Byte, String] =
    _.pull.uncons1.flatMap {
    case None => Pull.eval(Sync[F].delay(digest.digest().toHexString))
    case Some((c: Chunk[Byte], rest: Stream[F, Chunk[Byte]])) =>
    val bytes = c.toBytes
    for {
    _ <- Pull.eval(Sync[F].delay(digest.update(bytes.values, bytes.offset, bytes.length)))
    _ <- Pull.outputChunk(c)
    hexString <- pull(digest)(rest)
    } yield hexString
    }

    def calculateHashOf(input: Stream[F, Byte]): Pull[F, Byte, Unit] =
    for {
    initialDigest <- Pull.eval(Sync[F].delay(MessageDigest.getInstance("SHA-256")))
    hexString <- pull(initialDigest)(input.chunks)
    _ <- Pull.eval(promisedHexString.complete(hexString))
    } yield ()

    calculateHashOf(_).stream
    }
    }

    object Example extends StreamApp[IO] {
    import scala.concurrent.ExecutionContext.Implicits.global
    val example = "Hello World!"
    val strings: Stream[IO, Byte] = Stream.emit(example).through(text.utf8Encode)

    override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, StreamApp.ExitCode] =
    for {
    expectedHash <- Stream.eval(IO(MessageDigest.getInstance("SHA-256").digest(example.getBytes("UTF-8")).toHexString))
    promisedHash <- Stream.eval(Promise.empty[IO, String])
    _ <- strings.through(Sha256Pipe(promisedHash)).through(text.utf8Decode).fold1(_ + _).to(Sink(s => IO(println(s"Text: $s"))))
    actualHash <- Stream.eval(promisedHash.get)
    _ <- Stream.eval {
    for {
    _ <- IO(println(s"Expected hash: $expectedHash"))
    _ <- IO(println(s"Calculated hash: $actualHash"))
    _ <- IO(println(s"Are hashes equal? ${expectedHash == actualHash}"))
    } yield ()
    }
    } yield StreamApp.ExitCode.Success
    }