Last active
April 15, 2025 20:36
-
-
Save ervwalter/5ff6632c930c27a1eb6b07c986d7439b to your computer and use it in GitHub Desktop.
Migrate files in cephfs to a new file layout pool recursivly
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import logging | |
import sys | |
from concurrent.futures import ThreadPoolExecutor | |
import threading | |
import uuid | |
import xattr | |
from pathlib import Path | |
start_directory = '.' # current directory | |
scratch_directory = '.scratch' | |
max_parallel_threads = 4 | |
def has_ceph_pool_attr(file_path, pool_value): | |
""" Check if the file has the specified ceph pool attribute value using xattr. """ | |
try: | |
attributes = xattr.xattr(file_path) | |
ceph_pool = attributes.get('ceph.file.layout.pool').decode('utf-8') | |
return ceph_pool == pool_value | |
except (IOError, KeyError): | |
# IOError for inaccessible files, KeyError if the attribute does not exist | |
return False | |
def process_file(file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock): | |
""" Process each file in a separate thread, appending a unique identifier to filenames to avoid overwrites. """ | |
try: | |
if has_ceph_pool_attr(file_path, ceph_pool_value): | |
logging.debug(f"Skipping file with specified ceph pool attribute: {file_path}") | |
return | |
logging.info(f"Processing file: {file_path}") | |
# after replacing file, parent folder atime and mtime are modified | |
# keep them to replace them | |
parent_path = Path(file_path).parent.absolute() | |
parent_stat_info = os.stat(parent_path, follow_symlinks=False) | |
parent_mtime = parent_stat_info.st_mtime | |
parent_atime = parent_stat_info.st_atime | |
# Generate a unique identifier and append it to the filename | |
unique_suffix = uuid.uuid4().hex | |
scratch_file_name = f"{os.path.basename(file_path)}_{unique_suffix}" | |
scratch_file_path = os.path.join(scratch_dir, scratch_file_name) | |
stat_info = os.stat(file_path, follow_symlinks=False) | |
inode = stat_info.st_ino | |
nlink = stat_info.st_nlink | |
if nlink > 1 or inode in hard_links: | |
with lock: | |
if inode in hard_links: | |
os.remove(file_path) | |
os.link(hard_links[inode], file_path) | |
logging.info(f"Hard link recreated for file: {file_path}") | |
return | |
else: | |
logging.info(f"Hard link added to list for file: {file_path}") | |
hard_links[inode] = file_path | |
if os.path.islink(file_path): | |
link_target = os.readlink(file_path) | |
os.unlink(file_path) | |
os.symlink(link_target, file_path) | |
os.lchown(file_path, uid, gid) | |
else: | |
shutil.copy2(file_path, scratch_file_path) | |
shutil.copystat(file_path, scratch_file_path) | |
os.remove(file_path) | |
shutil.move(scratch_file_path, file_path) | |
os.chown(file_path, uid, gid) | |
# update parent atime and mtime | |
os.utime(parent_path, (parent_atime, parent_mtime)) | |
except Exception as e: | |
logging.error(f"Error processing {file_path}: {e}") | |
def handler(future): | |
future.result() | |
return | |
def process_files(start_dir, scratch_dir, ceph_pool_value): | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
if not os.path.exists(scratch_dir): | |
os.makedirs(scratch_dir) | |
hard_links = {} | |
lock = threading.Lock() | |
with ThreadPoolExecutor(max_workers=max_parallel_threads) as executor: | |
for root, dirs, files in os.walk(start_dir): | |
dirs.sort() | |
files.sort() | |
for file in files: | |
file_path = os.path.join(root, file) | |
if scratch_dir in file_path: | |
continue | |
stat_info = os.stat(file_path, follow_symlinks=False) | |
uid = stat_info.st_uid | |
gid = stat_info.st_gid | |
future = executor.submit(process_file, file_path, scratch_dir, uid, gid, ceph_pool_value, hard_links, lock) | |
future.add_done_callback(handler) | |
if os.path.exists(scratch_dir): | |
shutil.rmtree(scratch_dir) | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print("Usage: python script.py <ceph_pool_value>") | |
sys.exit(1) | |
ceph_pool_value = sys.argv[1] | |
process_files(start_directory, scratch_directory, ceph_pool_value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
also to add to consistency one could also check mtime and ctime with some small intervals after the file is moved to see if there is any application/user (that doesnt respect locks) on another client actively writing to the file and move it back and try again later if that is the case