Last active
October 9, 2020 09:03
-
-
Save aK0nshin/0ef25d374ef76284028edbb3875b25c2 to your computer and use it in GitHub Desktop.
Find files and directories with date and return them as json.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.4.10 | |
import argparse | |
import json | |
import os | |
import re | |
import time | |
from datetime import datetime, timedelta | |
from scandir import scandir | |
__version__ = "0.27" | |
def find_yesterday(yesterday_path): | |
try: | |
stat = os.stat(yesterday_path) | |
except OSError: | |
if args.verbose: | |
print("Cant find: {}".format(yesterday_path)) | |
return 0 | |
if args.verbose: | |
print("Found: {}, size: {}".format(yesterday_path, stat.st_size)) | |
return 1 if stat.st_size else 0 | |
def find_pg_yesterday(path): | |
yesterday_path = re.sub(r"\d{8}", yesterday.strftime('%Y%m%d'), path) | |
return find_yesterday(yesterday_path) | |
def find_mysql_yesterday(path): | |
yesterday_path = re.sub(r"\d{4}-\d{2}-\d{2}", yesterday.strftime('%Y-%m-%d'), path) | |
return find_yesterday(yesterday_path) | |
def add_pg_stat_to_result(path, result): | |
for utility_path in scandir(path.path): | |
if args.verbose: | |
print("Go in utility_path: {}".format(utility_path.path)) | |
for dump_path in scandir(utility_path.path): | |
match = re.match(r".*(\d{8}).*", dump_path.name) | |
if not match: | |
if args.verbose: | |
print("No date in: {}, skipping".format(dump_path.name)) | |
continue | |
backup_date = datetime.strptime(match.group(1), '%Y%m%d') | |
if backup_date != today: | |
if args.verbose: | |
print("Not today: {}, skipping".format(dump_path.name)) | |
continue | |
m = re.match(r"(.*)_\d{8}", dump_path.name) | |
if m: | |
db_name = m.group(1) | |
else: | |
db_name = path.name | |
stat = os.stat(dump_path.path) | |
t = {"dump": "content", "basebackup": "schema"} | |
try: | |
data_type = t[utility_path.name] | |
except KeyError: | |
data_type = 'unknown' | |
objects = [{"backup_type": backup_type, | |
"instance": path.name, | |
"data_type": data_type, | |
"creation_date": backup_date.timestamp(), | |
"db_name": db_name, | |
"size": stat.st_size, | |
"has_yesterday": find_pg_yesterday(dump_path.path)}] | |
if args.verbose: | |
print("Add objects to result: {}".format(objects)) | |
result.extend(objects) | |
def add_mysql_stat_to_result(path, result): | |
for date_path in scandir(path.path): | |
if args.verbose: | |
print("Go in date_path: {}".format(date_path.path)) | |
if not date_path.is_dir(): | |
if args.verbose: | |
print("{} is not a directory, skipping".format(date_path.path)) | |
continue | |
match = re.search(r'\d{4}-\d{2}-\d{2}', date_path.name) | |
if not match: | |
if args.verbose: | |
print("No date in: {}, skipping".format(date_path.name)) | |
continue | |
backup_date = datetime.strptime(match.group(), '%Y-%m-%d') | |
if backup_date != today: | |
if args.verbose: | |
print("Not today: {}, skipping".format(date_path.name)) | |
continue | |
for subfolder in scandir(date_path.path): | |
if args.verbose: | |
print("Go in subfolder: {}".format(subfolder.path)) | |
if not subfolder.is_dir(): | |
if args.verbose: | |
print("{} is not a directory, skipping".format(subfolder.path)) | |
continue | |
for db_path in scandir(subfolder.path): | |
if args.verbose: | |
print("Go in db_path: {}".format(db_path.path)) | |
if not (db_path.is_dir() or db_path.name.endswith('.sql.gz')): | |
if args.verbose: | |
print("{} is not a directory or .sql.gz archive, skipping".format(db_path.path)) | |
continue | |
stat = os.stat(db_path.path) | |
t = {"dump": "schema", "data": "content"} | |
try: | |
data_type = t[subfolder.name] | |
except KeyError: | |
data_type = 'unknown' | |
objects = [{"backup_type": backup_type, | |
"instance": path.name, | |
"data_type": data_type, | |
"creation_date": backup_date.timestamp(), | |
"db_name": db_path.name[:-7] if db_path.name.endswith(".sql.gz") else db_path.name, | |
"size": stat.st_size, | |
"has_yesterday": find_mysql_yesterday(db_path.path)}] | |
result.extend(objects) | |
parser = argparse.ArgumentParser(description='Find backups in directories.') | |
parser.add_argument('paths', type=str, nargs='+', | |
help='path(s) to service(s)') | |
parser.add_argument('-v', '--verbose', dest='verbose', | |
default=False, action='store_true', help='Show log messages.') | |
args = parser.parse_args() | |
if __name__ == '__main__': | |
start_time = time.time() | |
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) | |
yesterday = today - timedelta(days=1) | |
if args.verbose: | |
print("Got current date: {}".format(today)) | |
paths = args.paths | |
if args.verbose: | |
print("Got paths: {}".format(paths)) | |
result = [] | |
for service_path in paths: | |
backup_type = os.path.basename(service_path) or service_path.split("/")[-2] | |
for instance_path in scandir(service_path): | |
if args.verbose: | |
print("Go in instance_path: {}".format(instance_path.path)) | |
if not instance_path.is_dir(): | |
if args.verbose: | |
print("{} is not a directory, skipping".format(instance_path.path)) | |
continue | |
if "mysql" in backup_type: | |
add_mysql_stat_to_result(instance_path, result) | |
elif "pg" in backup_type: | |
add_pg_stat_to_result(instance_path, result) | |
else: | |
raise Exception("Unknown service type: {}".format(backup_type)) | |
if args.verbose: | |
print("--- Executed in {:.3f} seconds ---\n--- Result: ---".format(time.time() - start_time)) | |
print(json.dumps(result)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment