Last active
February 18, 2021 02:50
-
-
Save gullevek/c3baa7b777428ba90f62 to your computer and use it in GitHub Desktop.
Writes all data from a given query to a csv file. Usefull for reading out large batches of data to avoid memory over usage. Uses async and CURSOR. Python version only uses CURSOR as named cursor and async do not work at the same time.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
# AUTHOR: Clemens Schwaighofer | |
# DATE: 2015/8/6 | |
# DSCRIPTION: | |
# Runs a query from a file or command line and outputs the data to a CSV file | |
# Runs query async/and CURSOR | |
use strict; | |
use warnings; | |
no strict 'refs'; # I need to allow dynamic references in this script | |
use utf8; | |
BEGIN { | |
use POSIX qw(floor); | |
use DBD::Pg ':async'; | |
use Text::CSV_XS; | |
use Getopt::Long; | |
use Time::HiRes qw(time); | |
use Number::Format qw(format_number); | |
# assign a function to INTERRUPT | |
$SIG{INT} = \&tsktsk; | |
} | |
# in case a ^C is cought, cancel all remote running querries before exiting | |
sub tsktsk | |
{ | |
# assign to itself again | |
$SIG{INT} = \&tsktsk; | |
warn "\nPGSQL async: ".$main::dbh->{'pg_async_status'}."\n"; | |
# finish all asyncs | |
$main::dbh->pg_cancel() if ($main::dbh->{'pg_async_status'}); | |
# finish all cursors | |
$main::dbh->do($main::close_query) || print "Cursor '".$main::cursor_name."' not yet declared\n"; # close any open cursors | |
# loop cursor close | |
foreach my $my_cur (keys %main::cur) { | |
$main::cur{$my_cur}->finish; | |
} | |
# close read cursor | |
$main::sth_read->finish if ($main::sth_read); | |
# close all DB connections | |
$main::dbh->disconnect(); | |
# output data | |
close($main::CSV_DATA); | |
# die, so the script does not continue | |
die "Exit via ^C\n"; | |
} | |
# METHOD: check_output_file | |
# PARAMS: file name | |
# RETURN: none | |
# DESC : removes the file if not data was written | |
sub check_output_file | |
{ | |
my ($file_name) = @_; | |
if (! -s $file_name) { | |
print_out("Delete file $file_name because no data was found", 1); | |
unlink($file_name); | |
} | |
} | |
# METHOD: convert_time | |
# PARAMS: timestamp, 1/0 for micro time output | |
# RETURN: formated string of the input timestamp in days, hours, minutes, seconds and optional micorseconds | |
# DESC : make time string from seconds interval timestamp | |
sub convert_time | |
{ | |
my ($timestamp, $show_micro) = @_; | |
my $ms = ''; | |
# cut of the ms, but first round them up to four | |
$timestamp = sprintf("%.4f", $timestamp); | |
($timestamp, $ms) = split(/\./, $timestamp); | |
my @timegroups = (86400, 3600, 60, 1); | |
my @output = (); | |
for (my $i = 0; $i < @timegroups; $i ++) { | |
push(@output, floor($timestamp / $timegroups[$i])); | |
$timestamp = $timestamp % $timegroups[$i]; | |
} | |
# output has days|hours|min|sec | |
$timestamp = $output[0] ? $output[0].'d ' : ''; # days | |
$timestamp .= ($output[1] || $output[0]) ? $output[1].'h ' : ''; # hours | |
$timestamp .= ($output[2] || $output[1] || $output[0]) ? $output[2].'m ' : ''; # minutes | |
$timestamp .= $output[3].'s'; # seconds | |
$timestamp .= $show_micro ? ' '.(!$ms ? 0 : $ms).'ms' : ''; # microseconds | |
# return string | |
return $timestamp; | |
} | |
# converts bytes to human readable format | |
sub convert_number | |
{ | |
my ($number) = @_; | |
my $pos; # the original position in the labels array | |
$number = 0 if (!$number); | |
# divied number until its division would be < 1024. count that position for label usage | |
for ($pos = 0; $number > 1024; $pos ++) { | |
$number = $number / 1024; | |
} | |
# before we return it, we format it [rounded to 2 digits, if has decimals, else just int] | |
# we add the right label to it and return | |
return sprintf(!$pos ? '%d' : '%.2f', $number).qw(B KB MB GB TB PB EB)[$pos]; | |
} | |
# METHOD: print_out | |
# PARAMS: message, verbose level, no line break | |
# RETURN: n/a | |
# DESC : prints out the message based on the global verbose level | |
sub print_out | |
{ | |
my ($message, $v_level, $no_lb) = @_; | |
# debug data is only printend, when debug flag is on | |
print $message.(!$no_lb ? "\n" : '') if ($main::verbose >= $v_level); | |
} | |
# no buffering for output | |
$| ++; | |
binmode STDOUT, ":encoding(utf8)"; | |
binmode STDIN, ":encoding(utf8)"; | |
binmode STDERR, ":encoding(utf8)"; | |
my $error = 0; | |
my %opt = (); | |
our $verbose = 0; | |
my $query = ''; | |
my $output_file = ''; | |
my $db_connect_string = ''; | |
my $no_async = 0; | |
my $no_declare = 0; | |
my $wait_time = 10; # wait time for a finish check. defaul it is 10 seconds | |
# add prompt bundeling (eg -qqq | |
Getopt::Long::Configure ("bundling"); | |
# command line | |
my $result = GetOptions(\%opt, | |
'q|query=s' => \$query, | |
'o|output=s' => \$output_file, | |
'd|db=s' => \$db_connect_string, | |
'w|wait=s' => \$wait_time, | |
'no-async' => \$no_async, # do not run querys async | |
#'no-declare' => \$no_declare, # do no collect data server side | |
'verbose|v+' => \$verbose, | |
'help' # just help | |
) || exit 1; | |
if ($opt{'help'}) { | |
print "Possible options\n"; | |
print "--query|--q <file or query>\tCan either be a file that has the query inside or the query itself in a string\n"; | |
print "--output|--o <output file name>\tThe data read from the query is written into this file including headers\n"; | |
print "--db|-d <db connect string>\tConnection data in the following format: user=;pass=;dbname=;host=;port=\n"; | |
print "--wait|-w <time in seconds>\tOverride the default wait time of 10 seconds, Needs to be in range of 1 to 60\n"; | |
print "--no-async\tDon't run the query in async form\n"; | |
#print "--no-declare\tDon't run DECLARE on the server and collect data local\n"; | |
print "--verbose|-v [--verbose|-v ...]\tShow more info, at least one -v has to be given to see standard percent output. not needed for --list\n"; | |
print "--help\t this page\n"; | |
print "\n"; | |
exit 0; | |
} | |
if (! -f $query && -t STDIN) { | |
print "Please give a file with the query, use STDIN (pipe) or the query itself with the --query parameter\n"; | |
$error = 1; | |
} | |
if (!$output_file) { | |
print "Please give a target output file with the --output paramter\n"; | |
$error = 1; | |
} | |
if (!$db_connect_string) { | |
print "Please give the db connection string with the --db paramter\n"; | |
$error = 1; | |
} | |
if ($db_connect_string !~ /user=([^;.]*);?/ && $db_connect_string !~ /dbname=([^;.]*);?/) { | |
print "The db connection string needs at least a username and database name\n"; | |
$error = 1; | |
} | |
if ($wait_time < 1 || $wait_time > 60) { | |
print "Wait time needs to be a value between 1 and 60\n"; | |
$error = 1; | |
} | |
# exit if error | |
exit 1 if ($error); | |
# input/output encoding for files | |
my $encoding = 'utf8'; | |
# db stuff | |
my $dsn; | |
my $db_user; | |
my $db_pass; | |
our $dbh; # crm | |
# query | |
my %_query = (); | |
our %cur = (); | |
our $sth_read; | |
# cursor with timestamp connected | |
our $cursor_name = 'csr_'.join('_', split(/\./, time())); | |
# for cursor query, this is done for the big reads | |
my $q_name = 'read_data'; | |
my $do_query = "DECLARE ".$cursor_name." CURSOR WITH HOLD FOR "; # the cursor declaration is always the smae | |
my $move_all_query = "MOVE ALL ".$cursor_name; # for getting the count | |
my $move_first_query = "MOVE ABSOLUTE 0 ".$cursor_name; # move back to the top (move first moves to the first entry and a fetch will then get from the 2nd on, we need to go back to the top), BACKWARD ALL also possible | |
my $fetch_query = "FETCH 10000 FROM ".$cursor_name; # 10000 rows per fetch should be ok in size | |
our $close_query = "CLOSE ".$cursor_name; # close the cursor at the end | |
my $run_do_query = ''; # combined do_query + read query for declaration run | |
# various variables | |
my $result_rows; | |
my $rows_read; | |
my @csv_header = (); | |
# count and stats | |
my $first_run = 1; | |
my $start_run; | |
my $start_read; | |
my $count = 0; | |
my %count_detail = (); | |
my $percent; | |
my $_percent = -1; | |
# csv file handlers | |
our $CSV_DATA; | |
# load the query or set the query | |
$_query{$q_name} = ''; | |
# if it is a readable file, assume we read the query data from the file | |
if (-f $query) { | |
print_out("Reading query from file $query", 1); | |
my $FP; | |
open($FP, '<:encoding('.$encoding.')', $query) || die ("Can't open $query file for reading query data: $!\n"); | |
while (<$FP>) { | |
chomp $_; | |
# skip any line that starts with -- | |
if ($_ !~ /^(\s+)?--/) { | |
# strip out any data post a -- in the string | |
$_ =~ s/--.*//g; | |
# with safty space in front | |
$_query{$q_name} .= ' ' if ($_query{$q_name}); | |
$_query{$q_name} .= $_; | |
} | |
} | |
close($FP); | |
} elsif (! -t STDIN) { | |
while (<STDIN>) { | |
chomp $_; | |
# skip any line that starts with -- | |
if ($_ !~ /^\s+?--/) { | |
# strip out any -- that is in the string | |
$_ =~ s/--.*//g; | |
# with safty space in front | |
$_query{$q_name} .= ' ' if ($_query{$q_name}); | |
$_query{$q_name} .= $_; | |
} | |
} | |
} else { | |
print_out("Setting query from command line", 1); | |
$_query{$q_name} = $query; | |
} | |
# strip any ; from the query. anywhere, they are no used | |
$_query{$q_name} =~ s/;//g; | |
# if the query is not starting a select it is invalid | |
if ($_query{$q_name} !~ /^(\s+)?(with|select)/i) { | |
print "!!! Query needs to start with a SELECT or WITH statement\n"; | |
# should soft fail. could be with too? | |
# but just in case, now it needs to be select | |
# should be more strict check: no update, delete, insert, etc in subquery | |
exit; | |
} | |
# open database connection | |
print_out("Logging into Database: ".$db_connect_string, 1); | |
if ($db_connect_string =~ /user=([^;.]*);?/) { | |
$db_user = $1; | |
$db_connect_string =~ s/user=([^;.]*);?//; | |
} | |
if ($db_connect_string =~ /pass=([^;.]*);?/) { | |
$db_pass = $1; | |
$db_connect_string =~ s/pass=([^;.]*);?//; | |
} | |
$dsn = "DBI:Pg:".$db_connect_string; | |
$dbh = DBI->connect($dsn, $db_user, $db_pass) || die ("Can't connect to db $db_connect_string with user $db_user\n"); | |
# prepare queries | |
foreach my $key (keys %_query) { | |
my $query = $_query{$key}; | |
# all csv queries are cursor queries, everything else is normal | |
# if no declare is flagged then do not prefix it with a declare | |
$run_do_query = (!$no_declare ? $do_query : '').$query; | |
# if we have async yes and no override flag, set this query to be async | |
if (!$no_async) { | |
$cur{$key} = $dbh->prepare(qq{ $run_do_query }, {pg_async => PG_ASYNC}) || die "[$key] Can't prepare $DBI::errstr\n"; | |
} else { | |
$cur{$key} = $dbh->prepare(qq{ $run_do_query }) || die "[$key] Can't prepare $DBI::errstr\n"; | |
} | |
} | |
# open output file | |
open($CSV_DATA, '>:encoding('.$encoding.')', $output_file) || die ("Can't open $output_file for writing: $!\n"); | |
# for the export list data | |
my $csv = Text::CSV_XS->new ({ | |
'binary' => 1, | |
'eol' => "\r\n" | |
}); | |
print_out("Execute query [$q_name] ...", 1); | |
# set overall read time before the query is executed | |
$start_read = time(); | |
$cur{$q_name}->execute() || die ("Canot execute: ".$cur{$q_name}->errstr."\n"); | |
if (!$no_async) { | |
print_out("Waiting for query to execute {$q_name} [", 1, 1); | |
my $show_count = 1; | |
while (!$dbh->pg_ready) { | |
# print a dot only for every 10 (% mod) | |
print_out(".", 1, 1) if (!($show_count % 10)); | |
# show time since read start about every 10 min, (runs depend on wait time) | |
if ($show_count == int(600 / $wait_time)) { | |
print_out("(".convert_time(time() - $start_read).")", 1, 1); | |
$show_count = 0; | |
} | |
$show_count ++; | |
sleep $wait_time; # wait n seconds | |
} | |
my $end_data = $cur{$q_name}->pg_result || die ("Cannot call pg result: ".$cur{$q_name}->errstr."\n"); # needs to be called? | |
print_out("] {Status (".$end_data.") ", 1, 1); | |
} | |
# get the row count that will be returned | |
if (!$no_declare) { | |
$result_rows = $dbh->do($move_all_query) || die ("Cannot move all: ".$DBI::errstr."\n"); | |
} else { | |
$result_rows = $cur{$q_name}->rows; | |
} | |
# set to 0, if it has an 0 result of 0E0 | |
$result_rows = 0 if ($result_rows eq '0E0'); | |
$dbh->do($move_first_query) || die ("Cannot move first ".$DBI::errstr."\n") if (!$no_declare); | |
print_out("Returned: ".format_number($result_rows).", Run for: ".convert_time(time() - $start_read).((!$no_async) ? '}' : ''), 1); | |
# if nothing was found, abort | |
if (!$result_rows) { | |
print "Could not find any data, aborting run\n"; | |
close($CSV_DATA); | |
check_output_file($output_file); | |
$dbh->do($close_query); | |
$dbh->disconnect(); | |
exit 0; | |
} | |
$start_run = time(); | |
print_out("Reading data [$q_name]...", 1); | |
## TODO: if no_declare is set, we need to loop with the basic loop and not with a double exit loop | |
# prepare the fetch query | |
$sth_read = $dbh->prepare($fetch_query) || die ("Cannot prepare fetch: ".$DBI::errstr."\n"); | |
while (1) { | |
# first one will be time consuming | |
$sth_read->execute() || die ("Cannot execute fetch: ".$sth_read->errstr."\n"); | |
# returned rows to see if we should end | |
$rows_read = $sth_read->rows; | |
last if 0 == $rows_read; | |
print_out("... Reading ".format_number($rows_read)." rows of ".format_number($result_rows), 2); | |
while (my @data = $sth_read->fetchrow_array) { | |
# if we are in a first read pos, read that in as the header for the csv, also used as the loop reader for the columns | |
# ALTERNATIVE: use the sth read + name as the looper | |
if ($first_run) { | |
print_out("N in sth p exc: ".join(',', @{$sth_read->{NAME}}), 3); | |
foreach my $column (@{$sth_read->{NAME}}) { | |
push(@csv_header, $column); | |
} | |
$csv->combine(@csv_header); | |
print $CSV_DATA $csv->string(); | |
# first run sequence done | |
$first_run = 0; | |
} | |
# count processed data | |
$count ++; | |
$csv->combine(@data); | |
print $CSV_DATA $csv->string(); | |
# some progress output here if verbose is 2 or more, just % data next to each other without linebreaks | |
$percent = sprintf("%d", ($count / $result_rows) * 100); | |
if ($percent != $_percent) { | |
$_percent = $percent; | |
print_out("$percent% ", 1, 1); | |
} | |
} # outer read loop for declared cursor | |
} | |
# add line break after percent output | |
print_out("", 1); | |
# close all DB connections | |
print_out("Close DB connection", 2); | |
$sth_read->finish; | |
foreach my $my_cur (keys %cur) { | |
$cur{$my_cur}->finish; | |
} | |
$dbh->do($close_query); | |
$dbh->disconnect(); | |
$count_detail{$q_name}{'lines'} = $count; | |
$count_detail{$q_name}{'start_time'} = $start_run; | |
$count_detail{$q_name}{'end_time'} = time(); | |
$count_detail{$q_name}{'time_run'} = $count_detail{$q_name}{'end_time'} - $count_detail{$q_name}{'start_time'}; | |
# check if we have written anything, either empty file or 0 lines, if not unlink the open file | |
close($CSV_DATA); | |
check_output_file($output_file); | |
my $stats = "\n"; | |
$stats .= "* Dump query output to csv finished.\n"; | |
foreach my $q_name (sort keys %count_detail) { | |
$stats .= "< Input query: ".$query."\n"; | |
$stats .= "> Output file: ".$output_file."\n"; | |
$stats .= "| - Lines written : ".format_number($count_detail{$q_name}{'lines'})." (".convert_number(-s $output_file).")\n"; | |
$stats .= "| - Data write time: ".convert_time($count_detail{$q_name}{'time_run'}, 1)."\n"; | |
$stats .= "| - Process speed : ".sprintf("%s lines/s", format_number($count_detail{$q_name}{'lines'} / $count_detail{$q_name}{'time_run'}, 2))."\n"; | |
$stats .= "| Overall run time : ".convert_time($count_detail{$q_name}{'end_time'} - $start_read)."\n"; | |
} | |
print_out($stats, 0); | |
__END__ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# AUTHOR: Clemens Schwaighofer | |
# DATE: 2015/8/6 | |
# DSCRIPTION: | |
# Runs a query from a file or command line and outputs the data to a CSV file | |
# Runs query as a CURSOR (not async, doesn't work at the same time) | |
# Python version of perl script query_to_csv.pl | |
from math import floor | |
import argparse | |
import os | |
import sys | |
import re | |
import csv | |
import signal | |
from datetime import datetime | |
import psycopg2 | |
import psycopg2.extensions | |
import psycopg2.extras | |
# ^C abort handler | |
def signal_handler(signal, frame): | |
print("Cought an abort on signal {}".format(signal)) | |
RemoveOutputFile(args.output_file.name) | |
try: | |
cursor.cursor() | |
except cursor.cursor(): | |
print("No open cursor") | |
try: | |
dbh.close() | |
except dbh.close(): | |
print("No open database connection") | |
sys.exit(0) | |
# for argparse | |
# call: type=IntRange(n, m) | |
# custom defined range for n to m where data outside it is false, plus print only start and end for error | |
class IntRange(object): | |
def __init__(self, start, stop=None): | |
if stop is None: | |
start, stop = 0, start | |
self.start, self.stop = start, stop | |
def __call__(self, value): | |
value = int(value) | |
if (value < self.start or value > self.stop): | |
raise argparse.ArgumentTypeError('value out of of range between {} and {}'.format(self.start, self.stop)) | |
return value | |
# METHOD: ConvertTimestamp | |
# PARAMS: timestamp, 1/0 for micro time output | |
# RETURN: formated string of the input timestamp in days, hours, minutes, seconds and optional micorseconds | |
# DESC : make time string from seconds interval timestamp | |
def ConvertTimestamp(timestamp, show_micro=0): | |
# cut of the ms, but first round them up to four | |
timestamp = str(round(float(timestamp), 4)) | |
(timestamp, ms) = timestamp.split('.') | |
timestamp = int(timestamp) | |
ms = int(ms) | |
output = [] | |
for i in [86400, 3600, 60, 1]: | |
output.append(int(floor(timestamp / i))) | |
timestamp = timestamp % i | |
# output has days|hours|min|sec ms | |
time_string = '' | |
if output[0]: | |
time_string = '%sd' % output[0] | |
if output[0] or output[1]: | |
time_string += '%sh ' % output[1] | |
if output[0] or output[1] or output[2]: | |
time_string += '%sm ' % output[2] | |
time_string += '%ss' % output[3] | |
if show_micro: | |
time_string += ' %sms' % ms if ms else ' 0ms' | |
return time_string | |
# METHOD: FormatBytes | |
# PARAMS: bytes data in numeric | |
# RETURN: formated string | |
# DESC : convert bytes into human readable format | |
def FormatBytes(num, suffix='B'): | |
if not num: | |
num = 0 | |
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: | |
if abs(num) < 1024.0: | |
return "%3.1f%s%s" % (num, unit, suffix) | |
num /= 1024.0 | |
return "%.1f%s%s" % (num, 'Yi', suffix) | |
# METHOD: wait | |
# PARAMS: connection | |
# RETURN: none | |
# DESC : if the connection is in ASYNC mode, this has to be called after connection and each query execution | |
def wait(conn): | |
import select | |
status = '' | |
print("Async Status: ", end='') | |
while 1: | |
state = dbh.poll() | |
if state == psycopg2.extensions.POLL_OK: | |
print("READY({}/{})".format(state, psycopg2.extensions.POLL_OK)) | |
break | |
elif state == psycopg2.extensions.POLL_WRITE: | |
status = "w({})".format(state) | |
select.select([], [conn.fileno()], []) | |
elif state == psycopg2.extensions.POLL_READ: | |
status = ("r({})".format(state)) | |
select.select([conn.fileno()], [], []) | |
else: | |
print("[!]({})".format(state)) | |
raise psycopg2.OperationalError("poll() returned %s" % state) | |
print("{}".format(status), end='') | |
# METHOD: RemoveOutputFile | |
# PARAMS: file name | |
# RETURN: none | |
# DESC : checks if file is 0 bytes, and if yes removes it | |
def RemoveOutputFile(file_name): | |
if (os.stat(file_name).st_size == 0): | |
print("[!] Empty output file: {}. Will remove this file.".format(file_name)) | |
os.unlink(file_name) | |
# add signal catcher | |
signal.signal(signal.SIGINT, signal_handler) | |
# ==== ARGUMENT PARSE ==== | |
# launch parser | |
parser = argparse.ArgumentParser( | |
description='Reads query from commandline or file and processes it in a cursor based run and prints out the result to the given output file as CSV.', | |
epilog='Default run uses asynchronous query. Use --no-async to turn off.' | |
) | |
# the options | |
# query string or file | |
parser.add_argument( | |
'-q', | |
'--query', | |
default=sys.stdin, | |
metavar='QUERY OR FILE NAME', | |
help='Can either be a file that has the query inside or the query itself in a string' | |
) | |
# the output csv file | |
parser.add_argument( | |
'-o', | |
'--output', | |
required=True, | |
type=argparse.FileType(mode='w', encoding='utf-8'), | |
dest='output_file', | |
metavar='OUTPUT FILE NAME', | |
help='The data read from the query is written into this file including headers' | |
) | |
# database connect string | |
parser.add_argument( | |
'-d', | |
'--db', | |
required=True, | |
dest='db_connect_string', | |
metavar='DATABASE CONNECT STRING', | |
help='Connection data in the following format: user=;pass=;dbname=;host=;port=' | |
) | |
# wait time for async check | |
parser.add_argument( | |
'-w', | |
'--wait', | |
type=IntRange(1, 60), | |
default=10, | |
dest='wait_time', | |
metavar='TIME IN SECONDS', | |
help='Override the default wait time of 10 seconds, Needs to be in range of 1 to 60' | |
) | |
# no async flag | |
parser.add_argument( | |
'--no-async', | |
action='store_true', | |
dest='no_async', | |
help='Don\'t run the query in async form' | |
) | |
# verbose | |
parser.add_argument ( | |
'-v', | |
'--verbose', | |
action='count', | |
default=0, | |
help='verbose setting' | |
) | |
# read in the argumens | |
args = parser.parse_args() | |
# ==== ARGUMENT CHECK ==== | |
error = 0 | |
# need basic check on connect string for at least user and dbname | |
if not re.match('user=([^;.]*);?', args.db_connect_string) and not re.match('dbname=([^;.]*);?', args.db_connect_string): | |
print("The db connection string needs at least a user and database name in the format \"user=;dbname=\"") | |
error = 1 | |
if error == 1: | |
sys.exit(2) | |
# ==== QUERY PARAMETER CHECK ==== | |
# check query input | |
# should write to new variable for work? | |
query_data_is_file = False | |
query_data_is_stdin = False | |
# if (os.stat(args.query).st_size == 0): | |
if (sys.stdin and hasattr(args.query, 'name')): | |
print("* Assume query is STDIN") | |
query_data_is_file = True | |
query_data_is_stdin = True | |
elif (not os.path.isfile(args.query)): | |
print("* Assume direct query: {}".format(args.query)) | |
elif (os.path.isfile(args.query) and os.stat(args.query).st_size > 0): | |
print("* Assume query is file: {}".format(args.query)) | |
args.query = open(args.query, encoding='utf-8') | |
query_data_is_file = True | |
else: | |
print("! Query is not defined: EXIT ({})".format(args.query)) | |
sys.exit(1) | |
# close any open query reads | |
query_data = '' | |
if query_data_is_file is True: | |
regex_comment = re.compile('^(\s+)?--') | |
regex_inline_comment = re.compile('--.*') | |
# open better "with", but only can work with "file on parameter" not iwth std in | |
for line in args.query: | |
# reges for not reading anything that starts with -- | |
# if not re.match('^(\s+)?--', line) and len(line) > 0: | |
if not regex_comment.match(line) and len(line) > 0: | |
# remove any part after a -- in the line | |
# line = re.sub(r'--.*', '', line) | |
line = regex_inline_comment.sub('', line) | |
# add a space between the joined lines | |
if query_data: | |
query_data += ' ' | |
# remove any trailing white space (line breaks, etc) | |
query_data += line.rstrip() | |
# END IF MATCH | |
# close | |
args.query.close() | |
else: | |
# for direct data, as is | |
query_data = args.query | |
# if input is file, close file | |
if query_data_is_file is True: | |
args.query.close() | |
# replace and ; inside | |
query_data = re.sub(r';', '', query_data) | |
# check that query is a select, ignore all others for now (also with calls) | |
if not re.match('^(\s+)?(select|with)', query_data, flags=re.IGNORECASE): | |
print("[!] Query needs to start with a SELECT or WITH statement: {}".format(query_data)) | |
RemoveOutputFile(args.output_file.name) | |
sys.exit(1) | |
# print ("Query: {}".format(query_data)) | |
# ==== CSV FILE OPEN FOR WRITE ==== | |
# open CSV for writing | |
csvWrite = csv.writer(args.output_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) | |
# ==== POSTGRESQL OPEN ==== | |
# open postgesql connection | |
# prepare connect string: replace ; with space and pass= with password= | |
args.db_connect_string = re.sub(r';', ' ', args.db_connect_string) | |
args.db_connect_string = re.sub(r'pass=', 'password=', args.db_connect_string) | |
# try connection | |
try: | |
print("> Logging into database: {}".format(args.db_connect_string)) | |
dbh = psycopg2.connect(args.db_connect_string, async=False) | |
except psycopg2.Error as e: | |
print("[!] Connection could not be established with: {}, Code: {}, Error: {}".format(args.db_connect_string, e.pgcode, e.pgerror)) | |
sys.exit(1) | |
# do with a loop (like wait_select + status info) | |
wait(dbh) | |
# ==== POSTGRESQL CURSOR ==== | |
print("Execute query ... ", end='', flush=True) | |
start_time_query = datetime.now().timestamp() | |
# cursor is csr_ + timestamp _ microtime | |
cursor = dbh.cursor('csr_'.str(datetime.now().timestamp()).replace('.', '_'), scrollable=True) | |
try: | |
cursor.execute(query_data) | |
except psycopg2.Error as e: | |
print("\n[!] Query could not be executed\nCode: {}\nError: {}".format(e.pgcode, e.pgerror)) | |
# close database and file | |
RemoveOutputFile(args.output_file.name) | |
dbh.close() | |
sys.exit(1) | |
# wait(cursor.connection) | |
# scroll +1 to do the final execution | |
print("| ", end='', flush=True) | |
cursor.scroll(1) | |
end_time_query = datetime.now().timestamp() | |
print("[DONE] ({})".format(ConvertTimestamp(end_time_query - start_time_query, 1)), flush=True) | |
# get max row counts via scroll | |
# scroll unless you get 0 in the statusmessage | |
# sum up all previous counts = max rows | |
print("Reading max rows from cursor ... ", end='', flush=True) | |
start_time_rows = datetime.now().timestamp() | |
move_cursor_value = 100000 | |
max_rows = 0 | |
# move to top | |
try: | |
cursor.scroll(0, mode='absolute') | |
except: | |
print("Cannot scroll this cursor") | |
while not re.match('MOVE 0', cursor.statusmessage) or not max_rows: | |
cursor.scroll(move_cursor_value, mode='relative') | |
m = re.search('MOVE (\d+)', cursor.statusmessage) | |
# if max rows is 0 and the return of the m.group is also 0, we abort | |
if max_rows == 0 and int(m.group(1)) == 0: | |
max_rows = -1 | |
else: | |
max_rows += int(m.group(1)) | |
# if the max_rows is -1; we have an error and exit | |
if max_rows == -1: | |
print("Could not find any data, aborting run") | |
# close cursor, dbh and file | |
RemoveOutputFile(args.output_file.name) | |
cursor.close() | |
dbh.close() | |
# and exit | |
sys.exit(1) | |
# move to top | |
cursor.scroll(0, mode='absolute') | |
end_time_rows = datetime.now().timestamp() | |
print("[DONE] {0:,} rows ({1})".format(max_rows, ConvertTimestamp(end_time_rows - start_time_rows, 1)), flush=True) | |
print("Reading data into CSV file:", flush=True) | |
start_time_run = datetime.now().timestamp() | |
# set iteration size to a higher value | |
iter_cursor_value = 100000 | |
cursor.itersize = iter_cursor_value | |
# row print & csv write | |
first_run = 0 | |
row_count = 0 | |
_percent = 0 | |
for row in cursor: | |
# first run sets header in CSV file | |
if first_run == 0: | |
csv_header = [] | |
# create header names based on col names from the query | |
for col in cursor.description: | |
csv_header.append(col.name) | |
# write them to the CSV file | |
csvWrite.writerow(csv_header) | |
first_run = 1 | |
# write normal data to csv file | |
csvWrite.writerow(row) | |
# row counter | |
row_count += 1 | |
# procent output | |
percent = "{0:d}".format(round((row_count / max_rows) * 100), 0) | |
if percent != _percent: | |
_percent = percent | |
print("{}% ".format(percent), end='', flush=True) | |
cursor.close() | |
end_time_run = datetime.now().timestamp() | |
print("\n[DONE]", flush=True) | |
print("- Close DB connection and CSV file handler") | |
# close DB connection | |
dbh.close() | |
# close csv file handler | |
args.output_file.close() | |
# ==== POST CLEAN UP ==== | |
# post clean up | |
# unlink output_file if it is empty | |
RemoveOutputFile(args.output_file.name) | |
# ==== STATS OUTPUT ==== | |
# calculte the run times for each set | |
query_time = end_time_query - start_time_query | |
count_time = end_time_rows - start_time_rows | |
run_time = end_time_run - start_time_run | |
print("") | |
print("* Dump query output to csv finished.") | |
print("+ Run from {} to {}".format(datetime.fromtimestamp(start_time_query).strftime('%Y-%m-%d %H:%M:%S'), datetime.fromtimestamp(end_time_run).strftime('%Y-%m-%d %H:%M:%S'))) | |
print("< Input query: {}".format(query_data if query_data_is_stdin or not query_data_is_file else args.query.name)) | |
print("> Output file: {}".format(args.output_file.name)) | |
# alt format(row_count, ',d') | |
print("|- Lines written : {0:,} ({1})".format(row_count, FormatBytes(os.stat(args.output_file.name).st_size))) | |
print("|- Query run time : {}".format(ConvertTimestamp(query_time, 1))) | |
print("|- Rows count time: {}".format(ConvertTimestamp(count_time, 1))) | |
print("|- Data write time: {}".format(ConvertTimestamp(run_time, 1))) | |
print("|- Process speed : {0:,.2f} lines/s".format(float(row_count) / float(run_time))) | |
print("| Overall run time: {}".format(ConvertTimestamp(query_time + count_time + run_time, 1))) | |
# __END__ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment