Skip to content

Instantly share code, notes, and snippets.

@maljolani
Forked from metamatik/migrate-redis.py
Last active May 2, 2019 12:50
Show Gist options
  • Save maljolani/7d71b73afda27980e985e53d02feb9aa to your computer and use it in GitHub Desktop.
Save maljolani/7d71b73afda27980e985e53d02feb9aa to your computer and use it in GitHub Desktop.
Migrate Redis data on Amazon ElastiCache
"""
Forked and heavily adappted from:
https://gist.github.com/kitwalker12/517d99c3835975ad4d1718d28a63553e
Copies all keys from the source Redis host to the destination Redis host.
Useful to migrate Redis instances where commands like SLAVEOF and MIGRATE are
restricted (e.g. on Amazon ElastiCache).
The script scans through the keyspace of the given database number and uses
a pipeline of DUMP and RESTORE commands to migrate the keys.
Requires Redis 2.8.0 or higher.
Python requirements:
click
progressbar
redis
"""
import click
from progressbar import ProgressBar
from progressbar.widgets import Bar, ETA, Percentage
import redis
DEFAULT_REDIS_PORT = 6379
DEFAULT_REDIS_DB = 0
def _copy_keys(keys, src_pipe, dst, replace, non_existing, already_existing, remove):
# We execute the source pipeline.
src_result = src_pipe.execute()
# We create the matching destination pipeline.
dst_pipe = dst.pipeline()
for key, ttl, data in zip(keys, src_result[::2], src_result[1::2]):
if data != None:
dst_pipe.restore(key, ttl if ttl > 0 else 0, data, replace=replace)
# Remove the key if remove option is on
if remove:
src_pipe.delete(key)
src_pipe.execute()
else:
non_existing += 1
# We execute the destination pipeline.
dst_result = dst_pipe.execute(False)
# We analyze the results of the destination pipeline execution.
for key, result in zip(keys, dst_result):
if result != b'OK':
e = result
if (
hasattr(e, 'args')
and e.args[0] in (
'BUSYKEY Target key name already exists.',
'Target key name is busy.'
)
):
already_existing += 1
else:
print("Key failed:", key, repr(data), repr(result))
raise e
return non_existing, already_existing
@click.command()
@click.argument('src_host')
@click.option('--src_port', default=DEFAULT_REDIS_PORT, help='Source port (default: %d)' % DEFAULT_REDIS_PORT)
@click.option('--src_db', default=DEFAULT_REDIS_DB, help='Source db number (default: %d)' % DEFAULT_REDIS_DB)
@click.option('--src_ssl', default=False, is_flag=True, help='SSL connection to source? (default: False)')
@click.option('--src_pass', default=None, help='Source password (default: None)')
@click.argument('dst_host')
@click.option('--dst_port', default=DEFAULT_REDIS_PORT, help='Destination port (default: %d)' % DEFAULT_REDIS_PORT)
@click.option('--dst_db', default=DEFAULT_REDIS_DB, help='Destination db number (default: %d)' % DEFAULT_REDIS_DB)
@click.option('--dst_ssl', default=False, is_flag=True, help='SSL connection to destination? (default: False)')
@click.option('--dst_pass', default=None, help='Destination password (default: None)')
@click.option('--flush', default=False, is_flag=True, help='Flush destination before migration? (default: False)')
@click.option('--pattern', default='*', help='Pattern of key names to migrate (default: *)')
@click.option('--replace', default=False, is_flag=True, help='Replace existing keys in destination? (default: False)')
@click.option('--remove', default=False, is_flag=True, help='Remove existing keys from source? (default: False)')
def migrate(
src_host, src_port, src_pass, src_ssl, src_db,
dst_host, dst_port, dst_pass, dst_ssl, dst_db,
pattern, flush, replace, remove
):
print("Connecting to Redis instances...")
src = redis.StrictRedis(host=src_host, port=src_port, ssl=src_ssl, db=src_db, password=src_pass)
dst = redis.StrictRedis(host=dst_host, port=dst_port, ssl=dst_ssl, db=dst_db, password=dst_pass)
# Flush destination database?
if flush:
dst.flushdb()
print("Counting keys to migrate...")
num_keys = len(src.keys(pattern))
if num_keys == 0:
print("No keys found, exiting.")
return
progress_widgets = ['%d keys: ' % num_keys, Percentage(), ' ', Bar(), ' ', ETA()]
progress_bar = ProgressBar(widgets=progress_widgets, maxval=num_keys).start()
BATCH_SIZE = 100
cursor = 0
non_existing = 0
already_existing = 0
keys = []
src_pipe = src.pipeline()
# We loop over the keys matching the specified pattern:
for key in src.scan_iter(match=pattern):
cursor += 1
keys.append(key)
src_pipe.pttl(key)
src_pipe.dump(key)
# We have iterated enough for a migration batch.
if cursor % BATCH_SIZE == 0:
# We copy that batch of keys...
non_existing, already_existing = _copy_keys(keys, src_pipe, dst, replace, non_existing, already_existing)
# ... and we reset the source containers to continue iterating.
keys = []
src_pipe = src.pipeline()
progress_bar.update(min(num_keys, cursor))
# Iteration is now over, we need to restore the last partial batch.
non_existing, already_existing = _copy_keys(keys, src_pipe, dst, replace, non_existing, already_existing, remove)
progress_bar.finish()
print("Keys disappeared on source during scan:", non_existing)
print("Keys already existing on destination:", already_existing)
if __name__ == '__main__':
migrate()
@SikhNerd
Copy link

I've forked this and fixed a minor bug in one of the function calls: https://gist.github.com/SikhNerd/d9e7f0e5cb6462e4ec6bde5be373f5a7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment