Skip to content

Instantly share code, notes, and snippets.

@yjzhang
Created October 16, 2025 10:23
Show Gist options
  • Select an option

  • Save yjzhang/e4c41c3bc97f9770dc1a5d28fd7049d1 to your computer and use it in GitHub Desktop.

Select an option

Save yjzhang/e4c41c3bc97f9770dc1a5d28fd7049d1 to your computer and use it in GitHub Desktop.
cvm.py - implementation of the stream cardinality estimation/distinct elements counting algorithm from https://arxiv.org/pdf/2301.10191
#! /usr/bin/env python
# python implementation of the CMV algorith
# ref: https://arxiv.org/pdf/2301.10191
# https://www.quantamagazine.org/computer-scientists-invent-an-efficient-new-way-to-count-20240516/
# https://www-cs-faculty.stanford.edu/~knuth/papers/cvm-note.pdf
import heapq
import random
# Algorithm 1 from CVM
def cvm_count(stream, threshold=100):
p = 1
elements = set()
for e in stream:
if e in elements:
elements.remove(e)
r = random.random()
if r < p:
elements.add(e)
if len(elements) == threshold:
elements_new = elements.copy()
for e in elements:
if random.random() < 0.5:
elements_new.remove(e)
elements = elements_new
p = p/2
return len(elements)/p
# definition from Knuth
def cvm_count_k(stream, s=100):
p = 1
elements = []
for e in stream:
elements = [x for x in elements if x[1] != e]
r = random.random()
if r > p:
continue
else:
if len(elements) < s:
elements.append((r, e))
else:
max_pair = elements[0]
max_index = 0
for i, pair in enumerate(elements):
if pair[0] > max_pair[0]:
max_index = i
max_pair = pair
if r > max_pair[0]:
p = r
else:
p = max_pair[0]
elements[max_index] = (r, e)
return len(elements)/p
# using sorting rather than searching
def cvm_count_k_heap(stream, s=100):
p = 1
elements = []
for e in stream:
elements = [x for x in elements if x[1] != e]
r = random.random()
if r > p:
continue
else:
if len(elements) < s:
# use -r because this is a min-heap
heapq.heappush(elements, (-r, e))
else:
max_pair = elements[0]
current_p = -max_pair[0]
if r > current_p:
p = r
else:
p = current_p
heapq.heappop(elements)
heapq.heappush(elements, (-r, e))
return len(elements)/p
if __name__ == '__main__':
print(cvm_count(range(1000)))
print(cvm_count(range(10000)))
print(cvm_count_k(range(1000)))
print(cvm_count_k_heap(range(1000)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment