Created
June 13, 2019 11:41
-
-
Save snopoke/08ca7cb40ac04350893fc8d02dc15104 to your computer and use it in GitHub Desktop.
PostgreSQL log parsing into SQL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Load and parse CSV logs from PostgreSQL | |
import os | |
import re | |
import sys | |
import postgres_copy | |
import six | |
import sqlalchemy | |
CREATE_LOG_TABLE = """ | |
CREATE TABLE if not exists postgres_log | |
( | |
log_time timestamp(3) with time zone, | |
user_name text, | |
database_name text, | |
process_id integer, | |
connection_from text, | |
session_id text, | |
session_line_num bigint, | |
command_tag text, | |
session_start_time timestamp with time zone, | |
virtual_transaction_id text, | |
transaction_id bigint, | |
error_severity text, | |
sql_state_code text, | |
message text, | |
detail text, | |
hint text, | |
internal_query text, | |
internal_query_pos integer, | |
context text, | |
query text, | |
query_pos integer, | |
location text, | |
application_name text, | |
PRIMARY KEY (session_id, session_line_num) | |
); | |
""" | |
CREATE_PARSED_TABLE = """ | |
create table if not exists pg_log_parsed( | |
session_id text, | |
session_line_num bigint, | |
log_time timestamp(3) with time zone, | |
database_name text, | |
duration float, | |
query text, | |
params text, | |
PRIMARY KEY (session_id, session_line_num) | |
); | |
""" | |
def _create_tables(engine): | |
with engine.being() as conn: | |
conn.execute(CREATE_LOG_TABLE) | |
conn.execute(CREATE_PARSED_TABLE) | |
def _insert_raw_logs(log_path, engine): | |
metadata = sqlalchemy.MetaData(bind=engine) | |
metadata.reflect(bind=engine, extend_existing=True) | |
table = metadata.tables['postgres_log'] | |
with open(log_path, 'r') as f: | |
postgres_copy.copy_from( | |
f, table, engine, format='csv' if six.PY3 else b'csv', | |
null='' if six.PY3 else b'' | |
) | |
def parse_logs(engine): | |
query_rx = re.compile(r'duration: ([\d.]+).*(?:execute [\w<>]*|statement): (.*)') | |
params_rx = re.compile(r'parameters: (.*)') | |
with engine.begin() as conn: | |
res = conn.execute( | |
"select session_id, session_line_num, log_time, database_name, message, detail" | |
" from postgres_log where command_tag in ('SELECT', 'COPY')" | |
) | |
for row in res: | |
duration, query, params = None, None, None | |
match = query_rx.match(row.message) | |
if match: | |
duration, query = match.groups() | |
if row.detail: | |
match = params_rx.match(row.detail) | |
if match: | |
params = match.groups()[0] | |
conn.execute('insert into pg_log_parsed values(%s, %s, %s, %s, %s, %s, %s)', [ | |
row.session_id, | |
row.session_line_num, | |
row.log_time, | |
row.database_name, | |
float(duration) if duration else None, | |
query, | |
params | |
]) | |
if __name__ == '__main__': | |
args = sys.argv[1:] | |
if len(args) != 2: | |
print('Usage: pg_log_loader [path to log dir] [db connection url]') | |
log_path, db_url = args | |
engine = sqlalchemy.create_engine(db_url) | |
for file in os.listdir(log_path): | |
fpath = os.path.join(log_path, file) | |
if os.path.isfile(fpath) and file.endswith('.csv'): | |
_insert_raw_logs(fpath, engine) | |
parse_logs(engine) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment