Created
March 22, 2018 13:38
-
-
Save akozhemiakin/4934a5ab463f7d2104c3bb35dd88a2a8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import monix.execution.Ack.{Continue, Stop} | |
import monix.execution.{Ack, Scheduler} | |
import monix.reactive.Observable.Operator | |
import monix.reactive.observers.Subscriber | |
import scala.concurrent.{Future, Promise} | |
class ElasticBufferOperator[A](size: Int) extends Operator[A, A] { | |
override def apply(out: Subscriber[A]): Subscriber[A] = new Subscriber[A] { | |
private[this] var buffer: List[A] = Nil | |
private[this] var np: Option[Promise[A]] = None | |
private[this] var bf: Option[Promise[Ack]] = None | |
private[this] var stopped: Boolean = false | |
private[this] var isDone = false | |
override implicit val scheduler: Scheduler = out.scheduler | |
feed() | |
override def onError(ex: Throwable): Unit = { | |
if (!isDone) { | |
isDone = true | |
buffer = null | |
out.onError(ex) | |
} | |
} | |
override def onComplete(): Unit = { | |
if (!isDone) { | |
isDone = true | |
} | |
} | |
override def onNext(elem: A): Future[Ack] = { | |
if (stopped) Stop | |
np match { | |
case Some(x) => | |
np = None | |
x.success(elem) | |
case None => buffer = buffer :+ elem | |
} | |
if (buffer.size < size) Continue else { | |
val p = Promise[Ack] | |
bf = Some(p) | |
p.future | |
} | |
} | |
private[this] def getNext: Future[A] = buffer.headOption.fold{ | |
val p = Promise[A] | |
np = Some(p) | |
p.future | |
}{v => | |
buffer = buffer.tail | |
if (buffer.isEmpty && isDone) { | |
buffer = null | |
} | |
bf.foreach{x => | |
bf = None | |
x.success(Continue) | |
} | |
Future(v) | |
} | |
private[this] def feed(): Unit = { | |
getNext.flatMap(v => out.onNext(v)).foreach { | |
case Continue => feed() | |
case Stop => stopped = true | |
} | |
} | |
} | |
} | |
object syntax { | |
implicit class ObservableOps[A](a: Observable[A]) { | |
final def bufferElastic(size: Int): Observable[A] = | |
a.liftByOperator(new ElasticBufferOperator[A](size)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment