Created
January 17, 2022 06:06
-
-
Save georgexsh/2291d0719d28b44a054642098fbc79fc to your computer and use it in GitHub Desktop.
sqlite_kv_concurrent.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import sqlite3 | |
import os | |
import random | |
import multiprocessing | |
class Store1: | |
"""sharding to tables""" | |
def __init__(self, filename="kv.db", buckets=10): | |
self.buckets = buckets | |
self.conn = sqlite3.connect(filename, timeout=60) | |
self.conn.execute("pragma journal_mode=wal") | |
for n in range(buckets): | |
self.conn.execute( | |
f'create table if not exists "kv_{n}" (key integer primary key, value integer) without rowid' | |
) | |
self.conn.commit() | |
def _get_table(self, key): | |
assert isinstance(key, int) | |
return f"kv_{key % self.buckets}" | |
def get(self, key): | |
item = self.conn.execute( | |
f'select value from "{self._get_table(key)}" where key=?', (key,) | |
) | |
if item: | |
return next(item)[0] | |
def set(self, key, value): | |
self.conn.execute( | |
f'replace into "{self._get_table(key)}" (key, value) values (?,?)', | |
(key, value), | |
) | |
self.conn.commit() | |
class Store0: | |
"""no sharding""" | |
def __init__(self, filename="kv.db"): | |
self.conn = sqlite3.connect(filename, timeout=60) | |
self.conn.execute("pragma journal_mode=wal") | |
self.conn.execute( | |
'create table if not exists "kv" (key integer primary key, value integer) without rowid' | |
) | |
self.conn.commit() | |
def get(self, key): | |
item = self.conn.execute('select value from "kv" where key=?', (key,)) | |
if item: | |
return next(item)[0] | |
def set(self, key, value): | |
self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value)) | |
self.conn.commit() | |
class Store: | |
"""sharding to files""" | |
def __init__(self, buckets=5): | |
self.buckets = buckets | |
self.conns = [] | |
for n in range(buckets): | |
conn = sqlite3.connect(f"kv_{n}.db", timeout=60) | |
conn.execute("pragma journal_mode=wal") | |
conn.execute( | |
'create table if not exists "kv" (key integer primary key, value integer) without rowid' | |
) | |
conn.commit() | |
self.conns.append(conn) | |
def _get_conn(self, key): | |
assert isinstance(key, int) | |
return self.conns[key % self.buckets] | |
def get(self, key): | |
item = self._get_conn(key).execute('select value from "kv" where key=?', (key,)) | |
if item: | |
return item.fetchall() | |
# return next(item)[0] | |
def set(self, key, value): | |
conn = self._get_conn(key) | |
conn.execute('replace into "kv" (key, value) values (?,?)', (key, value)) | |
conn.commit() | |
n = 10000 | |
d = [random.randint(0, 1 << 20) for _ in range(n)] | |
print("global", os.getpid(), sum(d)) | |
random.shuffle(d) | |
def init(cls): | |
s = cls() | |
for i in d: | |
s.set(i, i) | |
def worker(cls, n=None): | |
s = cls() | |
# print(os.getpid(), sum(d)) | |
for i in d: | |
s.get(i) | |
# print(r) | |
def test(cls, c): | |
worker(cls) # warm up | |
start = time.time() | |
ps = [] | |
for _ in range(c): | |
p = multiprocessing.Process(target=worker, args=(cls,)) | |
p.start() | |
ps.append(p) | |
while any(p.is_alive() for p in ps): | |
time.sleep(0.01) | |
cost = time.time() - start | |
print(f"{c:<10d}\t{cost:<7.2f}\t{n/cost:<20.2f}\t{n*c/cost:<14.2f}") | |
def main(): | |
multiprocessing.set_start_method("fork") | |
cls = Store | |
# cls = Store1 | |
init(cls) | |
print("concurrency\ttime(s)\tpre process TPS(r/s)\ttotal TPS(r/s)") | |
for c in range(1, 21): | |
test(cls, c) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment