Created
February 3, 2021 16:31
-
-
Save darkfrog26/481be7428fd1ebec1a0f5b05001467a8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.ConcurrentLinkedQueue | |
import scala.concurrent.duration.FiniteDuration | |
import scala.concurrent.{ExecutionContext, Future, TimeoutException} | |
import scala.util.{Failure, Success} | |
trait ParallelIterator[T] extends Iterator[T] { iterator => | |
private val buffer = new ConcurrentLinkedQueue[T] | |
private var finished = false | |
private var failure: Option[Throwable] = None | |
private lazy val maxTimeoutMillis = maxTimeout.toMillis | |
protected def maxTimeout: FiniteDuration | |
protected def createFutures(): List[Future[Seq[T]]] | |
protected implicit def executionContext: ExecutionContext | |
buildFutures() | |
private def buildFutures(): Unit = recurseFutures(createFutures()) | |
private def recurseFutures(futures: List[Future[Seq[T]]]): Unit = if (futures.nonEmpty) { | |
val future = futures.head | |
future.onComplete { | |
case Success(results) => { | |
results.foreach(buffer.add) | |
recurseFutures(futures.tail) | |
} | |
case Failure(throwable) => { | |
failure = Some(throwable) | |
finished = true | |
} | |
} | |
} else { | |
finished = true | |
} | |
override def hasNext: Boolean = { | |
if (!buffer.isEmpty) { | |
true | |
} else if (!finished) { | |
try { | |
val start = System.currentTimeMillis() | |
while (!finished && buffer.isEmpty) { | |
if (System.currentTimeMillis() - start > maxTimeoutMillis) { | |
val exc = new TimeoutException("Max timeout expired") | |
failure = Some(exc) | |
finished = true | |
throw exc | |
} | |
Thread.sleep(10) | |
} | |
!buffer.isEmpty | |
} catch { | |
case t: Throwable => { | |
finished = true | |
buffer.clear() | |
throw t | |
} | |
} | |
} else { | |
failure match { | |
case Some(t) => throw t | |
case None => false | |
} | |
} | |
} | |
override def next(): T = buffer.poll() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment