Skip to content

Instantly share code, notes, and snippets.

@mzhang77
Last active February 21, 2025 21:55
Show Gist options
  • Save mzhang77/59fadbe7084f4d3bff5d56382ab7e909 to your computer and use it in GitHub Desktop.
Save mzhang77/59fadbe7084f4d3bff5d56382ab7e909 to your computer and use it in GitHub Desktop.
import threading
import mysql.connector
from mysql.connector import Error
import random
import string
import time
from datetime import datetime, timedelta
from collections import Counter
import logging
from logging.handlers import RotatingFileHandler
'''
SET GLOBAL tidb_gc_life_time = '24h';
CREATE TABLE sbtest (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
k INT NOT NULL DEFAULT 0,
c CHAR(120) NOT NULL DEFAULT '',
pad CHAR(60) NOT NULL DEFAULT '',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
key(created_at)
);
'''
def setup_logger(log_file='app.log', max_bytes=500 * 1024 * 1024, backup_count=0):
"""
Sets up a logger with a rotating file handler.
:param log_file: Name of the log file.
:param max_bytes: Maximum file size before rotation (in bytes).
:param backup_count: Number of backup log files to keep.
:return: Configured logger instance.
"""
logger = logging.getLogger('AppLogger')
logger.setLevel(logging.INFO)
# Check if handlers are already set up to avoid duplicate logging
if not logger.handlers:
handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
global stats_counter
stats_counter = Counter()
# Setup the logger
global logger
logger = setup_logger(log_file='ime_test.log')
dbconfig = {
"database": "test",
"user": "root",
"password": "",
"host": "127.0.0.1",
"port": "4000"
}
# Function to generate random strings
def random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
# Function to insert a row similar to Sysbench's test table
def insert_rows_with_throttle(rows_per_second, total_rows, days_before):
global logger
# Initialize connection and cursor variables
conn = None
cursor = None
try:
# Connect to the MySQL database
# Database connection parameters
conn = mysql.connector.connect ( **dbconfig )
cursor = conn.cursor()
# Calculate the time interval between each batch of inserts
batch_size = 10 # Number of rows per batch
interval = 1.0 / (rows_per_second / batch_size) # Time interval per batch
# Insert rows in batches
inserted_rows = 0
while inserted_rows < total_rows:
batch_values = []
for _ in range(batch_size):
k = random.randint(1, 100000)
c = random_string(120)
pad = random_string(60)
batch_values.append((k, c, pad))
# Insert the batch into the table
query = f"INSERT INTO sbtest (k, c, pad, created_at) VALUES (%s, %s, %s, DATE_SUB(NOW(), INTERVAL {days_before} DAY))"
cursor.executemany(query, batch_values)
conn.commit()
inserted_rows += batch_size
# print(f"Inserted {inserted_rows}/{total_rows} rows.")
time.sleep(interval) # Throttle the inserts
logger.info(f"Inserted {total_rows} rows.")
except Exception as e:
logger.info(f"Exception: {e}")
finally:
# Close cursor and connection safely
if cursor is not None:
cursor.close()
if conn is not None:
conn.close()
def select_and_delete():
global logger
global stats_counter
# Initialize connection and cursor variables
conn = None
cursor = None
time.sleep(5)
try:
# Connect to the database
conn = mysql.connector.connect( **dbconfig )
cursor = conn.cursor(dictionary=True) # Fetch results as dictionaries
# Select rows based on the query
start_time = time.time()
select_query = "SELECT * FROM sbtest WHERE created_at < DATE_SUB(DATE_SUB(NOW(), INTERVAL 2 DAY), INTERVAL 30 MINUTE) ORDER BY created_at LIMIT 2500"
cursor.execute(select_query)
rows = cursor.fetchall() # Fetch the selected rows
end_time = time.time()
execution_time = end_time - start_time
stats_counter['SELECT total executions'] += 1
if execution_time > 0.3:
stats_counter['SELECT slow count'] += 1
logger.info(f"SELECT Execution Time: {execution_time:.6f} seconds. {stats_counter['SELECT slow count']} of {stats_counter['SELECT total executions']}")
if not rows:
logger.info(f"No rows found to delete.")
return
# Extract IDs of the rows to delete
ids_to_delete = [row['id'] for row in rows]
# print(f"Selected rows for deletion: {ids_to_delete}")
# Delete the selected rows
delete_query = "DELETE FROM sbtest WHERE id IN (%s);" % (
', '.join(['%s'] * len(ids_to_delete))
)
cursor.execute(delete_query, ids_to_delete)
conn.commit()
logger.info(f"Deleted {cursor.rowcount} rows.")
except Exception as e:
logger(f"Exception: {e}")
finally:
# Close cursor and connection safely
if cursor is not None:
cursor.close()
if conn is not None:
conn.close()
# Define a thread-safe stoppable thread class
class TimedThread(threading.Thread):
global logger
def __init__(self, runtime_days, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.runtime_seconds = runtime_days * 24 * 60 * 60 # Convert days to seconds
self._stop_event = threading.Event()
self.name = name
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
logger.info(f"{self.name} has started")
start_time = time.time()
while not self.stopped() and (time.time() - start_time) < self.runtime_seconds:
if self.name == "Thread-1":
insert_rows_with_throttle(400,1000,0)
elif self.name == "Thread-2":
select_and_delete()
elif self.name == "Thread-3":
insert_rows_with_throttle(400,1000,2)
logger.info(f"{self.name} has stopped.")
# Create and start the threads
thread1 = TimedThread(runtime_days=7, name="Thread-1")
thread2 = TimedThread(runtime_days=7, name="Thread-2")
thread3 = TimedThread(runtime_days=2, name="Thread-3")
threads = [thread1, thread2, thread3]
# Start all threads
for thread in threads:
thread.start()
# Wait for all threads to finish
for thread in threads:
thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment