Created
April 25, 2016 23:21
-
-
Save p5k6/9c8cf4c22d4aeb19bb62dd7e906d94f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
ETL step wrapper to extract data from mysql to S3, compressing it along the way | |
""" | |
import dataduct | |
from dataduct.config import Config | |
from dataduct.steps.etl_step import ETLStep | |
from dataduct.pipeline import CopyActivity | |
from dataduct.pipeline import MysqlNode | |
from dataduct.pipeline import PipelineObject | |
from dataduct.pipeline import Precondition | |
from dataduct.pipeline import ShellCommandActivity | |
from dataduct.utils.helpers import exactly_one | |
from dataduct.utils.exceptions import ETLInputError | |
from dataduct.database import SelectStatement | |
config = Config() | |
if not hasattr(config, 'mysql'): | |
raise ETLInputError('MySQL config not specified in ETL') | |
MYSQL_CONFIG = config.mysql | |
class ExtractMysqlGzipStep(ETLStep): | |
"""Extract Redshift Step class that helps get data out of redshift | |
""" | |
def __init__(self, | |
table=None, | |
sql=None, | |
host_name=None, | |
database=None, | |
output_path=None, | |
splits=1, | |
gzip=False, | |
**kwargs): | |
"""Constructor for the ExtractMysqlGzipStep class | |
Args: | |
schema(str): schema from which table should be extracted | |
table(path): table name for extract | |
insert_mode(str): insert mode for redshift copy activity | |
database(MysqlNode): database to excute the query | |
splits(int): Number of files to split the output to. | |
**kwargs(optional): Keyword arguments directly passed to base class | |
""" | |
if not exactly_one(table, sql): | |
raise ETLInputError('Only one of table, sql needed') | |
super(ExtractMysqlGzipStep, self).__init__(**kwargs) | |
if table: | |
sql = 'SELECT * FROM %s;' % table | |
elif sql: | |
table = SelectStatement(sql).dependencies[0] | |
else: | |
raise ETLInputError('Provide a sql statement or a table name') | |
host = MYSQL_CONFIG[host_name]['HOST'] | |
user = MYSQL_CONFIG[host_name]['USERNAME'] | |
password = MYSQL_CONFIG[host_name]['PASSWORD'] | |
input_node = self.create_pipeline_object( | |
object_class=MysqlNode, | |
schedule=self.schedule, | |
host=host, | |
database=database, | |
table=table, | |
username=user, | |
password=password, | |
sql=sql, | |
) | |
s3_format = self.create_pipeline_object( | |
object_class=PipelineObject, | |
type='TSV' | |
) | |
### so - here's what's happening. DP does not like to encrypt if your next task is not Redshift | |
### so, i tricked it. Disconnected the s3data node from the cycle, and added a "fake" one | |
### that points to the same path. The fake one has a precondition (which appears to use polling) | |
### to ensure the path exists before attempting to run. | |
compression = "none" | |
if gzip: | |
compression = "gzip" | |
intermediate_node = self.create_s3_data_node(format=s3_format, compression=compression) | |
fake_intermediate_node = None | |
### if we're not gzip, let's conserve number of objects created | |
if gzip: | |
precondition = self.create_pipeline_object( | |
object_class=Precondition, | |
is_directory=True | |
) | |
fake_intermediate_node = self.create_s3_data_node(format=s3_format, precondition=precondition) | |
### explicitly set the directory path to that of the intermediate_node | |
fake_intermediate_node.fields['directoryPath'][0] = intermediate_node.fields['directoryPath'][0] | |
self.create_pipeline_object( | |
object_class=CopyActivity, | |
schedule=self.schedule, | |
resource=self.resource, | |
worker_group=self.worker_group, | |
input_node=input_node, | |
output_node=intermediate_node, | |
depends_on=self.depends_on, | |
max_retries=self.max_retries, | |
) | |
self._output = self.create_s3_data_node( | |
self.get_output_s3_path(output_path)) | |
gunzip_part = \ | |
"for file in ${INPUT1_STAGING_DIR}/*.gz; do gunzip -f ${file}; done; " \ | |
if gzip else "" | |
gzip_part = "; for file in ${OUTPUT1_STAGING_DIR}/*; do gzip -f $file; done" \ | |
if gzip else "" | |
# This shouldn't be necessary but - | |
# Mysql uses \\n as null, so we need to remove it | |
command = ' '.join(["[[ -z $(find ${INPUT1_STAGING_DIR} -maxdepth 1 ! \ | |
-path ${INPUT1_STAGING_DIR} -name '*' -size +0) ]] \ | |
&& touch ${OUTPUT1_STAGING_DIR}/part-0 ", | |
"|| ", | |
gunzip_part, | |
"cat ${INPUT1_STAGING_DIR}/*", | |
"| sed 's/\\\\\\\\n/NULL/g'", # replace \\n | |
# get rid of control characters | |
"| tr -d '\\\\000'", | |
# split into `splits` number of equal sized files | |
("| split -a 4 -d -l $((($(cat ${{INPUT1_STAGING_DIR}}/* | wc -l) + \ | |
{splits} - 1) / {splits})) - ${{OUTPUT1_STAGING_DIR}}/part-"). \ | |
format(splits=splits), | |
"; for f in ${INPUT1_STAGING_DIR}/*; do echo ${f}; file ${f}; done ", | |
gzip_part]) | |
input_node = fake_intermediate_node if gzip else intermediate_node | |
self.create_pipeline_object( | |
object_class=ShellCommandActivity, | |
input_node=input_node, | |
output_node=self.output, | |
command=command, | |
max_retries=self.max_retries, | |
resource=self.resource, | |
worker_group=self.worker_group, | |
schedule=self.schedule, | |
) | |
@classmethod | |
def arguments_processor(cls, etl, input_args): | |
"""Parse the step arguments according to the ETL pipeline | |
Args: | |
etl(ETLPipeline): Pipeline object containing resources and steps | |
step_args(dict): Dictionary of the step arguments for the class | |
""" | |
input_args = cls.pop_inputs(input_args) | |
step_args = cls.base_arguments_processor(etl, input_args) | |
return step_args |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment