Skip to content

Instantly share code, notes, and snippets.

@Gaarv
Forked from dazfuller/Pipfile
Created October 26, 2024 12:09
Show Gist options
  • Save Gaarv/a4343348d89683b165cc5150756541d8 to your computer and use it in GitHub Desktop.
Save Gaarv/a4343348d89683b165cc5150756541d8 to your computer and use it in GitHub Desktop.
Multi-process loading of data into MySQL from Python
import csv
import logging
import multiprocessing as mp
from contextlib import contextmanager
from pathlib import Path
from typing import Dict
from mysql.connector import MySQLConnection
from mysql.connector.cursor import MySQLCursor
import mysql.connector
logging.basicConfig(level=logging.DEBUG)
config = {
'host': '<server>.mysql.database.azure.com',
'username': 'user',
'password': 'password',
'database': 'db_name',
'autocommit': False
}
@contextmanager
def mysql_connect(connection_config: Dict) -> MySQLConnection:
conn: MySQLConnection = mysql.connector.connect(**connection_config)
logging.info('Connected to database \'%s\' at \'%s\'', connection_config['database'], connection_config['host'])
yield conn
logging.info('Closing connection to server \'%s\'', connection_config['host'])
conn.close()
@contextmanager
def mysql_cursor(conn: MySQLConnection) -> MySQLCursor:
cur: MySQLCursor = conn.cursor()
yield cur
logging.info('Closing cursor')
cur.close()
insert_sql = """
INSERT INTO parking_citations (
TicketNumber
, IssueDate
, IssueTime
, MeterId
, MarkedTime
, RPStatePlate
, PlateExpiryDate
, VIN
, Make
, BodyStyle
, Color
, Location
, Route
, Agency
, ViolationCode
, ViolationDescription
, FineAmount
, Latitude
, Longitude
) VALUES (
%s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
, %s
)
"""
def worker(rows):
with mysql_connect(config) as conn:
with mysql_cursor(conn) as cursor:
logging.info('Processing batch')
cursor.executemany(insert_sql, rows)
conn.commit()
def get_chunks(batch_size: int, source_file: Path):
with open(source_file, 'r') as f:
csv_reader = csv.reader(f, delimiter=',')
next(csv_reader, None)
batch_data = []
batch_count = 0
for row in csv_reader:
batch_data.append([v if v is not '' else None for v in row])
batch_count += 1
if batch_count % batch_size == 0:
yield batch_data
batch_data = []
if batch_data:
yield batch_data
def main():
batch_size = 5000
source_file = Path('data/parking-citations.csv')
chunk_gen = get_chunks(batch_size, source_file)
pool = mp.Pool(mp.cpu_count()-1)
results = pool.imap(worker, chunk_gen)
pool.close()
pool.join()
if __name__ == '__main__':
main()
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
mysql-connector-python = "*"
[requires]
python_version = "3.6"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment