Skip to content

Instantly share code, notes, and snippets.

@p7g
Created December 3, 2025 02:09
Show Gist options
  • Select an option

  • Save p7g/f9a7cce6d519fac9e6eae437fd85bdbb to your computer and use it in GitHub Desktop.

Select an option

Save p7g/f9a7cce6d519fac9e6eae437fd85bdbb to your computer and use it in GitHub Desktop.
1brc Python 3.14t
import io
import mmap
import os
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
@dataclass(slots=True)
class Stats:
min: float = float("inf")
max: float = float("-inf")
sum: float = 0.0
count: int = 0
class MemoryViewIO(io.RawIOBase):
__slots__ = "_mv", "_len", "_pos"
def __init__(self, mv):
self._mv = mv
self._len = len(mv)
self._pos = 0
def readable(self):
return True
def readinto(self, b):
if self._pos >= self._len:
return 0 # EOF
n = min(len(b), self._len - self._pos)
b[:n] = self._mv[self._pos:self._pos+n]
self._pos += n
return n
def thread(view: memoryview) -> dict[bytes, Stats]:
f = io.BufferedReader(MemoryViewIO(view))
stats: dict[bytes, Stats] = {}
for line in iter(f.readline, b""):
station, temp = line.split(b";", 1)
temp = float(temp)
if station not in stats:
s = stats[station] = Stats()
else:
s = stats[station]
if temp < s.min:
s.min = temp
if temp > s.max:
s.max = temp
s.sum += temp
s.count += 1
f.close()
return stats
cpu_count = os.cpu_count()
assert cpu_count is not None
measurements_path = Path("data/measurements.txt")
all_stats: dict[bytes, Stats] = {}
with (
open(measurements_path, "rb") as f,
mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ) as mm,
ThreadPoolExecutor(cpu_count) as pool,
):
chunk_size = mm.size() // cpu_count
futures: list[Future[dict[bytes, Stats]]] = []
pos = 0
for i in range(cpu_count):
next_nl = mm.find(b"\n", pos + chunk_size) + 1
if next_nl == 0:
next_nl = mm.size()
futures.append(pool.submit(thread, memoryview(mm)[pos:next_nl]))
pos = next_nl
for fut in as_completed(futures):
stats = fut.result()
for k, s2 in stats.items():
if k not in all_stats:
all_stats[k] = s2
continue
s = all_stats[k]
if s2.min < s.min:
s.min = s2.min
if s2.max > s.max:
s.max = s2.max
s.sum += s2.sum
s.count += s2.count
print("{", end="")
first = True
for k in sorted(all_stats):
if first:
first = False
else:
print(", ", end="")
s = all_stats[k]
avg = s.sum / s.count
print(f"{k.decode()}={s.min:.1f}/{avg:.1f}/{s.max:.1f}", end="")
print("}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment