Skip to content

Instantly share code, notes, and snippets.

@radupotop
Created February 20, 2026 14:59
Show Gist options
  • Select an option

  • Save radupotop/6a7023a8b79d39853907ed340ccc2134 to your computer and use it in GitHub Desktop.

Select an option

Save radupotop/6a7023a8b79d39853907ed340ccc2134 to your computer and use it in GitHub Desktop.
from __future__ import annotations
import heapq
import itertools
from dataclasses import dataclass
from datetime import datetime
from typing import TypeAlias
@dataclass(frozen=True)
class Task:
user_id: int
provider: str
timestamp: datetime
# Types
Key: TypeAlias = tuple[int, str] # (user_id, provider)
Version: TypeAlias = int
Seq: TypeAlias = int
Timestamp: TypeAlias = datetime
HeapEntry: TypeAlias = tuple[Timestamp, Seq, Key, Version, Task]
ActiveMap: TypeAlias = dict[Key, tuple[Version, Task]]
class UniqueTaskQueue:
def __init__(self) -> None:
self._heap: list[HeapEntry] = []
self._seq = itertools.count()
self._version = itertools.count(1)
self._active: ActiveMap = {}
@staticmethod
def _key(task: Task) -> Key:
return (task.user_id, task.provider)
@staticmethod
def _wins_over(new: Task, old: Task) -> bool:
# older timestamp => higher priority
return new.timestamp < old.timestamp
def put(self, task: Task) -> bool:
k = self._key(task)
if k in self._active:
cur_ver, cur_task = self._active[k]
if self._wins_over(task, cur_task):
ver: Version = next(self._version)
self._active[k] = (ver, task)
heapq.heappush(
self._heap, (task.timestamp, next(self._seq), k, ver, task)
)
return True
return False
ver = next(self._version)
self._active[k] = (ver, task)
heapq.heappush(self._heap, (task.timestamp, next(self._seq), k, ver, task))
return True
def get(self) -> Task:
while self._heap:
ts, seq, k, ver, task = heapq.heappop(self._heap)
active = self._active.get(k)
if active is None:
continue
active_ver, _active_task = active
if ver != active_ver:
continue
del self._active[k]
return task
raise IndexError("Queue is empty")
def __len__(self) -> int:
return len(self._active)
def empty(self) -> bool:
return len(self) == 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment