Last active
June 6, 2022 01:08
-
-
Save LiutongZhou/57a961aaa07b73e1bfbda0667e858fec to your computer and use it in GitHub Desktop.
Priority queues
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""MinHeap and MaxHeap (Optimized Implementation)""" | |
from abc import ABC, abstractmethod | |
from collections import Counter, UserList | |
from functools import singledispatchmethod | |
from heapq import ( | |
_heapify_max, | |
_heappop_max, | |
_heapreplace_max, | |
_siftdown, | |
_siftdown_max, | |
_siftup, | |
_siftup_max, | |
heapify, | |
heappop, | |
heappush, | |
heappushpop, | |
heapreplace, | |
) | |
from typing import Any, Iterable, Optional, Union, overload | |
__author__ = "Liutong Zhou" | |
__all__ = ["MinHeap", "MaxHeap"] | |
class _BasePriorityQueue(ABC, UserList): | |
"""Base Class for MinHeap and MaxHeap""" | |
@abstractmethod | |
def __init__(self, queue: Optional[Iterable] = None): | |
super().__init__(queue) | |
self._to_remove = Counter() # cache items for lazy removal | |
self.heapify() | |
@abstractmethod | |
def heapify(self): | |
raise NotImplementedError | |
@abstractmethod | |
def push(self, element: Any): | |
raise NotImplementedError | |
@abstractmethod | |
def pop(self) -> Any: | |
raise NotImplementedError | |
@abstractmethod | |
def pushpop(self, element: Any) -> Any: | |
"""Push an element and pop the top element""" | |
raise NotImplementedError | |
@abstractmethod | |
def replace(self, element: Any) -> Any: | |
"""Pop the top element and push the element""" | |
raise NotImplementedError | |
@abstractmethod | |
def siftup(self, pos: int): | |
raise NotImplementedError | |
@abstractmethod | |
def siftdown(self, start_pos: int, pos: int): | |
raise NotImplementedError | |
@singledispatchmethod | |
def __delitem__(self, i: Union[int, slice]) -> None: | |
"""Deletes item i or sliced items from the heap""" | |
raise NotImplementedError("See specific single dispatch implementation") | |
@__delitem__.register | |
def _(self, i: int) -> None: | |
"""Deletes item i from the heap in O(log N) time""" | |
self.data[i] = self.data[-1] | |
del self.data[-1] | |
if i < len(self): | |
self.siftup(i) | |
self.siftdown(0, i) | |
@__delitem__.register | |
def _(self, i: slice) -> None: | |
"""Deletes sliced items from the heap in O(k log N) time""" | |
for index in range(len(self))[i]: | |
del self[index] | |
@overload | |
def __setitem__(self, i: int, o: Any) -> None: | |
... | |
@overload | |
def __setitem__(self, i: slice, o: Iterable[Any]) -> None: | |
... | |
@singledispatchmethod | |
def __setitem__(self, i, o) -> None: | |
"""Set item(s) o to index (slice) i""" | |
raise NotImplementedError("See specific single dispatch implementation") | |
@__setitem__.register | |
def _(self, i: int, o: Any) -> None: | |
"""Set item o to self[i] in O(log N) time""" | |
self.data[i] = o | |
self.siftup(i) | |
self.siftdown(0, i) | |
@__setitem__.register | |
def _(self, i: slice, o: Iterable[Any]) -> None: | |
"""Set items o to a slice of self[i]""" | |
for index, item in zip(range(len(self))[i], o): | |
self[index] = item | |
def remove(self, item, lazy=True): | |
"""Deletes item from heap | |
If lazy, mark item for deletion in self._to_remove. Items marked for deletion | |
will not be actually deleted until after pop / pushpop / replace is executed | |
so deletion can be done in O(log N) time. | |
If not lazy, deletes item immediately in O(N) time if item exists | |
otherwise raise IndexError. | |
Parameters | |
---------- | |
item : Any | |
lazy : bool | |
""" | |
if not lazy: | |
index = self.index(item) | |
del self[index] | |
else: | |
self._to_remove.update((item,)) | |
self._lazy_remove() | |
@abstractmethod | |
def _lazy_remove(self): | |
"""Execute lazy remove, the actual cleaning step in O(log N) time""" | |
_to_remove: Counter = self._to_remove | |
while self and self[0] in _to_remove: | |
removed = heappop(self.data) | |
_to_remove.subtract((removed,)) | |
if _to_remove[removed] == 0: | |
del _to_remove[removed] | |
class MinHeap(_BasePriorityQueue): | |
"""Min Heap -- The min element is always at index 0 | |
All methods maintain min heap invariant | |
""" | |
def __init__(self, queue: Optional[Iterable] = None): | |
"""Initialize a MinHeap from a sequence or iterable""" | |
super().__init__(queue) | |
def _lazy_remove(self): | |
"""Execute lazy remove, the actual cleaning step in O(log N) time""" | |
super()._lazy_remove() | |
def heapify(self): | |
"""Construct the min heap itself in linear time""" | |
heapify(self.data) | |
self._lazy_remove() | |
def push(self, element: Any): | |
"""Push an element to the MinHeap""" | |
heappush(self.data, element) | |
def pop(self) -> Any: | |
"""Pops the top min element""" | |
item = heappop(self.data) | |
self._lazy_remove() | |
return item | |
def pushpop(self, element: Any) -> Any: | |
"""Push an element and pop the minimum element""" | |
item = heappushpop(self.data, element) | |
self._lazy_remove() | |
return item | |
def replace(self, element: Any) -> Any: | |
"""Pop the min element and push the new element""" | |
item = heapreplace(self.data, element) | |
self._lazy_remove() | |
return item | |
def siftup(self, pos: int): | |
"""Sift the smaller children up, starting from pos till leaf""" | |
_siftup(self.data, pos) | |
if pos == 0: | |
self._lazy_remove() | |
def siftdown(self, start_pos: int, pos: int): | |
"""Sift the larger parent down, starting from pos till start_pos""" | |
_siftdown(self.data, start_pos, pos) | |
if start_pos == 0: | |
self._lazy_remove() | |
class MaxHeap(_BasePriorityQueue): | |
"""Max Heap -- The max element is always at index 0 | |
All methods maintain max heap invariant | |
""" | |
def __init__(self, queue: Optional[Iterable] = None): | |
"""Initialize a MaxHeap from a sequence or iterable""" | |
super().__init__(queue) | |
def _lazy_remove(self): | |
"""Execute lazy remove, the actual cleaning step in O(log N) time""" | |
_to_remove = self._to_remove | |
while self and self[0] in _to_remove: | |
removed = _heappop_max(self.data) | |
_to_remove.subtract((removed,)) | |
if _to_remove[removed] == 0: | |
del _to_remove[removed] | |
def heapify(self): | |
"""Construct the max heap itself in linear time""" | |
_heapify_max(self.data) | |
self._lazy_remove() | |
def push(self, element: Any): | |
"""Push an element to the MaxHeap""" | |
self.data.append(element) | |
self.siftdown(0, len(self) - 1) | |
def pop(self) -> Any: | |
"""Pops the top max element""" | |
item = _heappop_max(self.data) | |
self._lazy_remove() | |
return item | |
def pushpop(self, element: Any) -> Any: | |
"""Push an element and pop the max element""" | |
if self and self[0] > element: | |
element, self[0] = self[0], element | |
self.siftup(0) | |
return element | |
def replace(self, element: Any) -> Any: | |
"""Pop the max element and push the new element""" | |
item = _heapreplace_max(self.data, element) | |
self._lazy_remove() | |
return item | |
def siftup(self, pos: int): | |
"""Sift the larger children up, starting from pos till leaf""" | |
_siftup_max(self.data, pos) | |
if pos == 0: | |
self._lazy_remove() | |
def siftdown(self, start_pos: int, pos: int): | |
"""Sift the smaller parent down, starting from pos till start_pos""" | |
_siftdown_max(self.data, start_pos, pos) | |
if start_pos == 0: | |
self._lazy_remove() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment