Skip to content

Instantly share code, notes, and snippets.

@mkrajc
Created March 25, 2020 10:05
Show Gist options
  • Save mkrajc/2f8b21922252a23a9421e1b2329712a7 to your computer and use it in GitHub Desktop.
Save mkrajc/2f8b21922252a23a9421e1b2329712a7 to your computer and use it in GitHub Desktop.
Scala BoundedPriorityQueue
package com.merlonintelligence.commons.stream
import java.io.Serializable
import java.util
import java.util.PriorityQueue
import scala.collection.JavaConverters._
object BoundedPriorityQueue {
// since heap does not preserve order for the elements of the same priority,
// each element will be wrapped together with its incremental ordinal number
private[BoundedPriorityQueue] final case class Elem[A: Ordering](value: A, ordinal: Long)
implicit def elemOrdering[A](implicit ord: Ordering[A]): Ordering[Elem[A]] = (x: Elem[A], y: Elem[A]) => {
val cmp = ord.compare(x.value, y.value)
if (cmp == 0) x.ordinal.compareTo(y.ordinal) else cmp
}
def apply[A: Ordering](maxSize: Int): BoundedPriorityQueue[A] = new BoundedPriorityQueue(maxSize)
}
/**
* BoundedPriorityQueue is scala wrapper around java's PriorityQueue. It is not thread-safe and has additional properties:
*
* 1. It's bounded - it allows only up to maxSize elements in the queue
* 2. It preserves the order of elements if the priority is same
*
* Second property makes it generic enough, to be used as a regular queue.
* When the queue is full it rejects new elements to be added into queue.
*
* The second property is implemented by including a sequence number into the element itself
* Priority is based on ordering of element type A, lesser are higher priority.
*
* @param maxSize maximum size of the queue
* @param ord ordering of elements
* @tparam A type of elements in queue
*/
case class BoundedPriorityQueue[A] private (maxSize: Int)(implicit ord: Ordering[A])
extends util.AbstractQueue[A]
with Serializable {
import BoundedPriorityQueue._
private val underlying = new PriorityQueue[Elem[A]](maxSize, elemOrdering)
private var counter = 0L
override def clear(): Unit = underlying.clear()
override def toString: String = s"BoundedPriorityQueue(${size()}/$maxSize)"
private def safeElem(elem: Elem[A]): A =
if (elem != null) elem.value
else null.asInstanceOf[A]
override def poll(): A = safeElem(underlying.poll())
override def offer(elem: A): Boolean =
if (size < maxSize) {
counter += 1
val key = Elem(elem, counter)
underlying.offer(key)
} else false
override def peek(): A = safeElem(underlying.peek())
override def size(): Int = underlying.size()
/**
* Important: iterator is not guaranteed to traverse in priority order
*/
override def iterator(): util.Iterator[A] = new util.Iterator[A] {
private val underlyingIterator = underlying.iterator()
override def hasNext: Boolean = underlyingIterator.hasNext
override def next(): A = safeElem(underlyingIterator.next())
}
def nonEmpty: Boolean = !isEmpty
/**
* Important: iterator is not guaranteed to traverse in priority order
*/
def toIterator: Iterator[A] = asScalaIterator(iterator())
/**
* Elements in list are in the priority order.
*/
def toList: List[A] = toIterator.toList.sorted
def isFull: Boolean = size == maxSize
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment