-
-
Save mainframe/0a9a96692e1d1eaa98f2bcf58bab27c4 to your computer and use it in GitHub Desktop.
Multi threaded RBD copy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Python script to multi-threaded copy a RBD image from one cluster to another | |
This script requires a configuration file and a RBD image to copy. | |
It will copy the RBD image from the source pool to destination pool as | |
specified in the configuration file. | |
It assumes the destination image already exists and is at least the size of the | |
source image. | |
Example config (rbd-export-import.conf): | |
[source] | |
pool = rbd | |
mon_host = 2001:db8::100 | |
id = admin | |
key = AQAI1s5XK0q0ChAA9FDEBluyt4E0OI26oGVsGQ== | |
[destination] | |
pool = rbd | |
mon_host = 2001:db8::200 | |
id = admin | |
key = AQCuASJXa27KDhAAg3Ww25Jn/ZLmlDcwhVLNXg== | |
Author: Wido den Hollander <[email protected]> | |
""" | |
import rbd | |
import rados | |
import argparse | |
import logging | |
import sys | |
import threading | |
import time | |
import Queue | |
from ConfigParser import ConfigParser | |
LOGGER = logging.getLogger() | |
# Read in 1GB segment sizes | |
SEGMENT_SIZE = 1073741824 | |
def rbd_copy_worker(i, q, source, dest, chunk_size): | |
while True: | |
chunk = q.get() | |
offset = chunk['offset'] | |
length = chunk['length'] | |
LOGGER.debug('Worker %d offset: %d length: %d', i, offset, length) | |
done = 0 | |
while done < length: | |
data = source.read(offset, chunk_size) | |
read = len(data) | |
dest.write(data, offset) | |
offset += read | |
done += read | |
q.task_done() | |
def create_rados_connection(mon_host, rados_id, key): | |
conn = rados.Rados(rados_id=rados_id) | |
conn.conf_set('mon_host', mon_host) | |
conn.conf_set('key', key) | |
conn.connect() | |
return conn | |
def main(config, img_source, img_dest, workers, chunk_size): | |
source = create_rados_connection(config.get('source', 'mon_host'), | |
config.get('source', 'id'), | |
config.get('source', 'key')) | |
dest = create_rados_connection(config.get('destination', 'mon_host'), | |
config.get('destination', 'id'), | |
config.get('destination', 'key')) | |
LOGGER.info('Spawning %d workers to copy %s to %s', workers, img_source, | |
img_dest) | |
LOGGER.debug('Creating RADOS IoCTX') | |
source_io = source.open_ioctx(config.get('source', 'pool')) | |
dest_io = dest.open_ioctx(config.get('destination', 'pool')) | |
LOGGER.debug('Opening RBD images') | |
source_rbd = rbd.Image(source_io, img_source, read_only=True) | |
dest_rbd = rbd.Image(dest_io, img_dest) | |
size = source_rbd.size() | |
LOGGER.info('Size of source image is %d', size) | |
if dest_rbd.size() < source_rbd.size(): | |
raise Exception('Destination image is small than source') | |
LOGGER.info('Will use %d byte chunks to read and write', chunk_size) | |
LOGGER.info('Splitting up into %d sized segments', SEGMENT_SIZE) | |
threads = [] | |
chunk_queue = Queue.Queue() | |
for i in range(workers): | |
worker = threading.Thread(target=rbd_copy_worker, | |
args=(i, chunk_queue, source_rbd, dest_rbd, | |
chunk_size,)) | |
threads.append(worker) | |
worker.daemon = True | |
worker.start() | |
offset = 0 | |
while offset < size: | |
if offset + SEGMENT_SIZE > size: | |
length = size - (offset + SEGMENT_SIZE) | |
else: | |
length = SEGMENT_SIZE | |
chunk = {'offset': offset, 'length': length} | |
chunk_queue.put(chunk) | |
offset += length | |
LOGGER.info('Waiting for Queue to be empty') | |
chunk_queue.join() | |
LOGGER.debug('Closing RBD images') | |
source_rbd.close() | |
dest_rbd.close() | |
LOGGER.debug('Closing RADOS IoCTX') | |
source_io.close() | |
dest_io.close() | |
LOGGER.debug('Closing RADOS connections') | |
source.shutdown() | |
dest.shutdown() | |
if __name__ == "__main__": | |
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) | |
parser = argparse.ArgumentParser(description='Ceph RBD import/export') | |
parser.add_argument('--config', action='store', dest='conffile', | |
default='rbd-export-import.conf', | |
help='Configuration file') | |
parser.add_argument('--source-image', action='store', dest='img_source', | |
help='The source image') | |
parser.add_argument('--dest-image', action='store', dest='img_dest', | |
help='The destination image') | |
parser.add_argument('--workers', action='store', dest='workers', type='int', | |
help='Number of worker threads to run', default=10) | |
parser.add_argument('--chunk-size', action='store', dest='chunk_size', | |
type='int', default=262144) | |
args = parser.parse_args() | |
conf = ConfigParser() | |
conf.readfp(open(args.conffile)) | |
main(conf, args.img_source, args.img_dest, args.workers, args.chunk_size) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment