Last active
February 14, 2025 18:47
-
-
Save mzhang77/89fe193962e04ac5b2524ec4534cd5fd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector | |
from mysql.connector import pooling | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import logging | |
import time | |
import sys | |
import random | |
import string | |
# Database connection details | |
DB_CONFIG = { | |
"user": "root", | |
"password": "", | |
"host": "127.0.0.1", | |
"port": "4000", | |
} | |
NUM_THREADS = 30 # Number of threads | |
BATCH_SIZE = 1000 # Number of rows per batch insert | |
# Create a connection pool | |
CONNECTION_POOL = pooling.MySQLConnectionPool( | |
pool_name="mypool", | |
pool_size=NUM_THREADS + 2, | |
**DB_CONFIG | |
) | |
# Configure logger | |
def setup_logger(log_to_file=False, log_file='app.log'): | |
logger = logging.getLogger('MyLogger') | |
logger.setLevel(logging.DEBUG) | |
if logger.handlers: | |
logger.handlers.clear() | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
handler = logging.FileHandler(log_file) if log_to_file else logging.StreamHandler(sys.stdout) | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
return logger | |
logger = setup_logger(log_to_file=False) | |
# Function to generate random strings | |
def random_string(length): | |
return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | |
# Function to reset connection pool | |
def reset_connection_pool(): | |
global CONNECTION_POOL | |
try: | |
logger.warning("Reinitializing connection pool...") | |
CONNECTION_POOL = pooling.MySQLConnectionPool( | |
pool_name="mypool", | |
pool_size=NUM_THREADS + 2, | |
**DB_CONFIG | |
) | |
logger.info("Connection pool reinitialized successfully.") | |
except Exception as e: | |
logger.critical(f"Failed to reinitialize connection pool: {e}") | |
# Function to create a schema and populate it | |
def create_schema(schema_name, num_tables, rows_per_table): | |
while True: | |
try: | |
conn = CONNECTION_POOL.get_connection() | |
cursor = conn.cursor() | |
cursor.execute(f"DROP DATABASE IF EXISTS {schema_name}") | |
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {schema_name}") | |
cursor.execute(f"USE {schema_name}") | |
for i in range(1, num_tables + 1): | |
table_name = f"sbtest{i}" | |
logger.info(f"Schema {schema_name}: Creating table {table_name} with {rows_per_table} rows...") | |
cursor.execute(f""" | |
CREATE TABLE {table_name} ( | |
id BIGINT NOT NULL AUTO_INCREMENT, | |
k INT NOT NULL DEFAULT '0', | |
c CHAR(120) NOT NULL DEFAULT '', | |
pad CHAR(60) NOT NULL DEFAULT '', | |
PRIMARY KEY (id), | |
KEY k_1 (k) | |
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 | |
""") | |
insert_query = f"INSERT INTO {table_name} (k, c, pad) VALUES (%s, %s, %s)" | |
batch_data = [] | |
for _ in range(rows_per_table): | |
batch_data.append((random.randint(1, 100000), random_string(120), random_string(60))) | |
if len(batch_data) >= BATCH_SIZE: | |
cursor.executemany(insert_query, batch_data) | |
conn.commit() | |
batch_data.clear() | |
if batch_data: # Insert remaining rows | |
cursor.executemany(insert_query, batch_data) | |
conn.commit() | |
logger.info(f"Created schema: {schema_name}") | |
return True | |
except pooling.PoolError as e: | |
logger.error(f"Connection pool exhausted while creating schema {schema_name}: {e}. Resetting pool...") | |
reset_connection_pool() | |
except mysql.connector.Error as e: | |
logger.error(f"Error creating schema {schema_name}: {e}. Retrying...") | |
time.sleep(1) | |
except Exception as e: | |
logger.critical(f"Unexpected error: {e}. Retrying...") | |
time.sleep(1) | |
finally: | |
if 'cursor' in locals(): | |
cursor.close() | |
if 'conn' in locals() and conn.is_connected(): | |
conn.close() | |
# Main function to create schemas in parallel | |
def create_all_schemas(start, end, num_threads, tables_per_schema, rows_per_table): | |
schemas = [f"sbtest{i}" for i in range(start, end + 1)] | |
with ThreadPoolExecutor(max_workers=num_threads) as executor: | |
future_to_schema = {executor.submit(create_schema, schema, tables_per_schema, rows_per_table): schema for schema in schemas} | |
for future in as_completed(future_to_schema): | |
schema_name = future_to_schema[future] | |
try: | |
future.result() | |
except Exception as e: | |
logger.error(f"Failed to create schema {schema_name}: {e}") | |
if __name__ == "__main__": | |
START_SCHEMA = 1 | |
END_SCHEMA = 30 # 90G | |
TABLES_PER_SCHEMA = 3 #3G | |
ROWS_PER_TABLE = 1000000 #1G | |
logger.info(f"Starting to create schemas from sbtest{START_SCHEMA} to sbtest{END_SCHEMA} using {NUM_THREADS} threads...") | |
create_all_schemas(START_SCHEMA, END_SCHEMA, NUM_THREADS, TABLES_PER_SCHEMA, ROWS_PER_TABLE) | |
logger.info("All schemas created.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment