Created
December 10, 2012 13:52
-
-
Save sgodbillon/4250672 to your computer and use it in GitHub Desktop.
Bug Enumerator (StackOverflowError in a Promise)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bugs | |
import play.api.libs.iteratee._ | |
import scala.util.Failure | |
import scala.util.Success | |
import scala.concurrent.Future | |
import scala.concurrent.Promise | |
object StackOverflowErrorBug { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
trait Cursor { | |
def iterator :Iterator[String] | |
def hasNext :Boolean | |
def next :Future[Cursor] | |
def i: Int = 0 | |
def n(cursor: Cursor) = | |
if(cursor.iterator.hasNext) { | |
Future(Some((cursor,Some(cursor.iterator.next)))) | |
} else if (cursor.hasNext) { | |
val fut = //cursor.next.map(c => Some((c,None))) | |
Future(Some(DefaultCursor(cursor.i + 1) -> None)) | |
print(fut + ",") | |
fut | |
} else { | |
Future(None) | |
} | |
def enumerate = { | |
CustomEnumerator.unfoldM(this) { cursor => | |
n(cursor) | |
}.andThen(Enumerator.eof).onDoneEnumerating{ | |
println("done") | |
} &> Enumeratee.collect { | |
case Some(e) => e | |
} | |
} | |
object CustomEnumerator { | |
def unfoldM[S,E](s:S)(f: S => Future[Option[(S,E)]] ): Enumerator[E] = checkContinue1(s)(new TreatCont1[E,S]{ | |
def apply[A](loop: (Iteratee[E,A],S) => Future[Iteratee[E,A]], s:S, k: Input[E] => Iteratee[E,A]):Future[Iteratee[E,A]] = f(s).flatMap { | |
case Some((newS,e)) => { | |
// if we don't create this intermediate promise, then a stackoverflowerror is eventually thrown | |
// original code -> | |
// loop(k(Input.El(e)),newS) | |
// <- original code | |
val promise = Promise[play.api.libs.iteratee.Iteratee[E,A]]() | |
loop(k(Input.El(e)),newS).onComplete { | |
case Success(s) => | |
promise.success(s) | |
case Failure(f) => | |
promise.failure(f) | |
} | |
promise.future | |
} | |
case None => Future(Cont(k)) | |
} | |
}) | |
trait TreatCont1[E,S]{ | |
def apply[A](loop: (Iteratee[E,A],S) => Future[Iteratee[E,A]], s:S, k: Input[E] => Iteratee[E,A]):Future[Iteratee[E,A]] | |
} | |
def checkContinue1[E,S](s:S)(inner:TreatCont1[E,S]) = new Enumerator[E] { | |
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = { | |
def step(it: Iteratee[E, A], state:S): Future[Iteratee[E,A]] = it.fold{ | |
case Step.Done(a, e) => Future(Done(a,e)) | |
case Step.Cont(k) => inner[A](step,state,k) | |
case Step.Error(msg, e) => Future(Error(msg,e)) | |
} | |
step(it,s) | |
} | |
} | |
def loop(cursor: Cursor) :Future[Option[(Option[String], Cursor)]] = { | |
if(cursor.iterator.hasNext) | |
Future(Some(Some(cursor.iterator.next) -> cursor)) | |
else if(cursor.hasNext) | |
cursor.next.map(c => Some(None -> c)) | |
else Future(None) | |
} | |
} | |
} | |
case class DefaultCursor(override val i: Int) extends Cursor { | |
val iterator = { | |
val r = (for(j <- 0 to 1) yield i + "" + j) | |
r.toIterator | |
} | |
def hasNext = i < 5000 | |
def next = { | |
Future(DefaultCursor(i + 1)) | |
} | |
} | |
case class FlattenedCursor(cursor: Future[Cursor]) extends Cursor { | |
val iterator = Iterator.empty | |
def hasNext = true | |
def next = cursor | |
} | |
// should print "done: <some result>" at the end | |
def test = { | |
val enumerator = FlattenedCursor(Future(DefaultCursor(0))).enumerate | |
val fut = enumerator.apply(Iteratee.foreach({ e => | |
//println(e) | |
})) | |
val ff = Iteratee.flatten(fut).run | |
ff.onComplete { | |
case e => | |
println("done: " + e) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "TestIteratees" | |
version := "1.0" | |
scalaVersion := "2.10.0-RC1" | |
resolvers += "Typesafe repository snapshots" at "http://repo.typesafe.com/typesafe/snapshots/" | |
resolvers += "Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/" | |
libraryDependencies ++= Seq( | |
"play" % "play-iteratees_2.10" % "2.1-RC1" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
not minimal enough? here might be a smaller one:
java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:58)
at scala.concurrent.Promise$class.failure(Promise.scala:107)
at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:58)
at scala.concurrent.Future$$anonfun$flatMap$1.liftedTree3$1(Future.scala:283)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:277)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:274)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:29)
at scala.concurrent.forkjoin.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1417)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:915)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:980)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)