Skip to content

Instantly share code, notes, and snippets.

@laughingman7743
Last active March 8, 2017 01:46
Show Gist options
  • Save laughingman7743/80db047cfb6c120f9b9f93fd79fe843d to your computer and use it in GitHub Desktop.
Save laughingman7743/80db047cfb6c120f9b9f93fd79fe843d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
import logging
import time
import paramiko
_logger = logging.getLogger(__name__)
class S3DistcpError(Exception):
pass
class S3DistcpClient(object):
KEEP_ALIVE_INTERVAL = 10
BUFFER_SIZE = 1024
JAR_PATH = '/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar'
def __init__(self, rsa_key_path, host, port=22, username='hadoop'):
self.host = host
self.port = port
self.rsa_key_path = rsa_key_path
self.username = username
self._connect()
def _connect(self):
self.transport = paramiko.Transport((self.host, self.port))
self.transport.connect()
self.transport.set_keepalive(self.KEEP_ALIVE_INTERVAL)
self.transport.auth_publickey(self.username, paramiko.RSAKey(filename=self.rsa_key_path))
def _build_command(self, src_bucket_name, src, dest_bucket_name, dest,
group_by, target_size, *args, **kwargs):
cmds = [
'hadoop',
'jar',
self.JAR_PATH,
"--src='s3://{0}/{1}'".format(src_bucket_name, src),
"--dest='s3://{0}/{1}'".format(dest_bucket_name, dest),
"--groupBy='{0}'".format(group_by),
'--targetSize={0}'.format(target_size)
]
for arg in args:
if arg:
cmds.extend(["--{0}".format(arg)])
for key, val in kwargs.iteritems():
if key and val:
cmds.extend(["--{0}='{1}'".format(key, val)])
return ' '.join(cmds)
def exec_command(self, src_bucket_name, src, dest_bucket_name, dest,
group_by='(.)*', target_size=2048, *args, **kwargs):
cmd = self._build_command(src_bucket_name, src, dest_bucket_name, dest,
group_by, target_size, *args, **kwargs)
_logger.info(cmd)
session = self.transport.open_session()
session.set_combine_stderr(True)
session.exec_command(cmd)
while not session.exit_status_ready():
if session.recv_ready():
_logger.info(session.recv(self.BUFFER_SIZE))
else:
time.sleep(10)
while session.recv_ready():
_logger.info(session.recv(self.BUFFER_SIZE))
exit_status = session.recv_exit_status()
_logger.info('Distcp exit status: %d', exit_status)
session.close()
if exit_status != 0:
raise S3DistcpError
def close(self):
self.transport.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == '__main__':
with S3DistcpClient('/path/to/id_rsa', 'host_name') as client:
# basic usage
client.exec_command('mybucket', 'path/to/',
'mybucket', 'path/to/')
# filtering
from datetime import datetime
client.exec_command('mybucket', 'path/to/',
'mybucket', 'path/to/',
srcPattern=datetime.now().strftime('.*\.%Y-%m-%d.*\.gz'),
outputCodec='gz')
# gzip compress per 1GB
client.exec_command('mybucket', 'path/to/',
'mybucket', 'path/to/',
target_size=1024,
outputCodec='gz')
# more complex example
client.exec_command('mybucket', 'logs/j-XXXXXXXXXXXXX/node/',
'mybucket', 'output/',
s3Endpoint='s3-eu-west-1.amazonaws.com',
srcPattern='.*[a-zA-Z,]+')
client.exec_command('mybucket', 'cloudfront/',
'mybucket', 'output/',
'.*XABCD12345678.([0-9]+-[0-9]+-[0-9]+-[0-9]+).*',
128,
'deleteOnSuccess',
outputCodec='lzo')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment