Skip to content

Instantly share code, notes, and snippets.

@s5bug
Created January 30, 2026 18:56
Show Gist options
  • Select an option

  • Save s5bug/c1b1883512fe0db5e379f886ceca7a86 to your computer and use it in GitHub Desktop.

Select an option

Save s5bug/c1b1883512fe0db5e379f886ceca7a86 to your computer and use it in GitHub Desktop.
sealed abstract class DecodeResult[+A]
final case class Accept[+A](remainder: Vector[Byte], a: A) extends DecodeResult[A]
final case class Reject() extends DecodeResult[Nothing]
final case class Nonfinal[S](state: S) extends DecodeResult[Nothing]
trait StreamingDecode[A] {
final type Result = Accept[A] | Reject | Nonfinal[State]
final type EndResult = Accept[A] | Reject
type State
def start: State
// chunk may be assumed nonempty in practice
def process(current: State, chunk: Vector[Byte]): Result
// end of incoming bytes
def endOfStream(current: State): EndResult
}
object StreamingDecode {
final case class SingleByte(byte: Byte) extends StreamingDecode[Unit] {
override final type State = Unit
override final def start: Unit = ()
override final def process(current: Unit, chunk: Vector[Byte]): Result =
if chunk.isEmpty then
Nonfinal(())
else if chunk.head == byte then
Accept(chunk.tail, ())
else
Reject()
override final def endOfStream(current: Unit): EndResult =
Reject()
}
final case class Repeat[A](single: StreamingDecode[A], n: Int) extends StreamingDecode[Vector[A]] {
override final case class State(subState: single.State, accumulator: Vector[A])
override final def start: State = State(single.start, Vector.empty)
override final def process(current: State, chunk: Vector[Byte]): Result =
single.process(current.subState, chunk) match {
case Accept(r, a) =>
val newAccum = current.accumulator :+ a
if newAccum.length == n then Accept(r, newAccum)
else process(State(single.start, newAccum), r)
case Reject() => Reject()
case Nonfinal(s) => Nonfinal(State(s, current.accumulator))
}
override final def endOfStream(current: State): EndResult =
single.endOfStream(current.subState) match {
case Accept(r, a) =>
val newAccum = current.accumulator :+ a
if newAccum.length == n then Accept(r, newAccum)
else Reject()
case Reject() => Reject()
}
}
final case class Choice[A, B](first: StreamingDecode[A], second: StreamingDecode[B]) extends StreamingDecode[Either[A, B]] {
// this Vector[Byte] is essentially a backtracking mark
sealed abstract class State
final case class StateA(accumulator: Vector[Byte], aState: first.State) extends State
final case class StateB(bState: second.State) extends State
override final def start: State = StateA(Vector.empty, first.start)
override final def process(current: State, chunk: Vector[Byte]): Result =
current match {
case StateA(accumulator, aCurr) =>
val newAccumulator = accumulator ++ chunk
first.process(aCurr, chunk) match {
case Accept(r, a) => Accept(r, Left(a))
case Reject() =>
val backtrack = second.process(second.start, newAccumulator)
backtrack match {
case Accept(r, b) => Accept(r, Right(b))
case Reject() => Reject()
case Nonfinal(s) => Nonfinal(StateB(s))
}
case Nonfinal(s) => Nonfinal(StateA(newAccumulator, s))
}
case StateB(bCurr) =>
second.process(bCurr, chunk) match {
case Accept(r, b) => Accept(r, Right(b))
case Reject() => Reject()
case Nonfinal(s) => Nonfinal(StateB(s))
}
}
override final def endOfStream(current: State): EndResult =
current match {
case StateA(accumulator, aCurr) =>
first.endOfStream(aCurr) match {
case Accept(r, a) => Accept(r, Left(a))
case Reject() =>
val backtrack = second.process(second.start, accumulator)
backtrack match {
case Accept(r, b) => Accept(r, Right(b))
case Reject() => Reject()
case Nonfinal(s) => this.endOfStream(StateB(s))
}
}
case StateB(bCurr) =>
second.endOfStream(bCurr) match {
case Accept(r, b) => Accept(r, Right(b))
case Reject() => Reject()
}
}
}
final case class Seq[A, B](first: StreamingDecode[A], second: StreamingDecode[B]) extends StreamingDecode[(A, B)] {
sealed abstract class State
final case class StateA(aState: first.State) extends State
final case class StateB(bState: second.State, a: A) extends State
override final def start: State = StateA(first.start)
override final def process(current: State, chunk: Vector[Byte]): Result =
current match {
case StateA(aState) =>
first.process(aState, chunk) match {
case Accept(r, a) =>
val continue = second.process(second.start, r)
continue match {
case Accept(r, b) => Accept(r, (a, b))
case Reject() => Reject()
case Nonfinal(s) => Nonfinal(StateB(s, a))
}
case Reject() => Reject()
case Nonfinal(s) => Nonfinal(StateA(s))
}
case StateB(bState, a) =>
second.process(bState, chunk) match {
case Accept(r, b) => Accept(r, (a, b))
case Reject() => Reject()
case Nonfinal(s) => Nonfinal(StateB(s, a))
}
}
override final def endOfStream(current: State): EndResult =
current match {
case StateA(aState) =>
first.endOfStream(aState) match {
case Accept(r, a) =>
val continue = second.process(second.start, r)
continue match {
case Accept(r, b) => Accept(r, (a, b))
case Reject() => Reject()
case Nonfinal(s) => this.endOfStream(StateB(s, a))
}
case Reject() => Reject()
}
case StateB(bState, a) =>
second.endOfStream(bState) match {
case Accept(r, b) => Accept(r, (a, b))
case Reject() => Reject()
}
}
}
final case class As[A, B](dec: StreamingDecode[A], value: B) extends StreamingDecode[B] {
override final type State = dec.State
override final def start: State = dec.start
override final def process(state: State, chunk: Vector[Byte]): Result = dec.process(state, chunk) match {
case Accept(r, _) => Accept(r, value)
case Reject() => Reject()
case Nonfinal(s) => Nonfinal(s)
}
override final def endOfStream(state: State): EndResult = dec.endOfStream(state) match {
case Accept(r, _) => Accept(r, value)
case Reject() => Reject()
}
}
}
import StreamingDecode.*
val cafebabe = As(Seq(Seq(SingleByte(0xCA.toByte), SingleByte(0xFE.toByte)), Seq(SingleByte(0xBA.toByte), SingleByte(0xBE.toByte))), "cafebabe")
cafebabe.process(cafebabe.start, Vector(0xCA, 0xFE, 0xBA).map(_.toByte))
cafebabe.process(cafebabe.start, Vector(0xCA, 0xFE, 0xBA, 0xBE).map(_.toByte))
val deadbeef = As(Seq(Seq(SingleByte(0xDE.toByte), SingleByte(0xAD.toByte)), Seq(SingleByte(0xBE.toByte), SingleByte(0xEF.toByte))), "deadbeef")
deadbeef.process(deadbeef.start, Vector(0xDE, 0xAD, 0xBA).map(_.toByte))
deadbeef.process(deadbeef.start, Vector(0xDE, 0xAD, 0xBE, 0xEF).map(_.toByte))
val cafebeef = As(Seq(Seq(SingleByte(0xCA.toByte), SingleByte(0xFE.toByte)), Seq(SingleByte(0xBE.toByte), SingleByte(0xEF.toByte))), "cafebeef")
val babeOrBeef = Choice(cafebabe, cafebeef)
babeOrBeef.process(babeOrBeef.start, Vector(0xCA, 0xFE, 0xBE, 0xEF).map(_.toByte))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment