Created
July 23, 2020 09:23
-
-
Save vhutov/cfdb274aae762027de28ce6ef26bdaad to your computer and use it in GitHub Desktop.
PartialStreamConsume
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package object consume { | |
implicit class RichStream[F[_], A](s: Stream[F, A]) { | |
def consumeUntil(f: A => Boolean) | |
(implicit C: Stream.Compiler[F, F], ME: MonadError[F, Throwable]): F[(Vector[A], Stream[F, A])] = { | |
consumeUntilAs(f, _.some) | |
} | |
def consumeUntilAs[B](f: A => Boolean, g: A => Option[B]) | |
(implicit C: Stream.Compiler[F, F], ME: MonadError[F, Throwable]): F[(Vector[B], Stream[F, B])] = { | |
consumeUntil0(f, g) | |
.take(1) | |
.compile | |
.lastOrError | |
} | |
private def consumeUntil0[B](f: A => Boolean, g: A => Option[B]): Stream[F, (Vector[B], Stream[F, B])] = { | |
def go(str: Stream[F, A], buffer: Vector[B]): Pull[F, (Vector[B], Stream[F, B]), Unit] = { | |
str.pull.uncons.flatMap { | |
case None => Pull.output1(buffer -> Stream.empty) >> Pull.done | |
case Some((h, t)) => | |
val idx = h.indexWhere(f) | |
idx match { | |
case None => go(t, buffer ++ h.toVector.mapFilter(g)) | |
case Some(idx) => | |
val (before, after) = h.splitAt(idx) | |
val newBuffer = buffer ++ before.toVector.mapFilter(g) | |
val newTail = (Stream.chunk(after) ++ t).mapFilter(g) | |
Pull.output1(newBuffer -> newTail) >> Pull.done | |
} | |
} | |
} | |
go(s, Vector()).stream | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object UseCase extends IOApp { | |
override def run(args: List[String]): IO[ExitCode] = { | |
val s = Stream[IO, Either[String, Int]](1.asRight, 2.asRight, 3.asRight) ++ | |
Stream.sleep_(1.second) ++ | |
Stream[IO, Either[String, Int]]("token".asLeft) ++ | |
Stream[IO, Either[String, Int]](4.asRight, 5.asRight, "token".asLeft, 6.asRight) | |
val partiallyConsumed: IO[(Vector[Int], Stream[IO, Int])] = s.consumeUntilAs(_.isLeft, _.right.toOption) | |
partiallyConsumed.flatMap { case (initialState, rest) => | |
rest.scan(initialState)(_ :+ _) | |
.evalTap(buff => IO(println(buff))) | |
.compile | |
.drain | |
}.as(ExitCode.Success) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment